summaryrefslogtreecommitdiffstats
path: root/ml/ml.cc
diff options
context:
space:
mode:
authorvkalintiris <vasilis@netdata.cloud>2023-03-21 11:24:41 +0200
committerGitHub <noreply@github.com>2023-03-21 11:24:41 +0200
commit5046e034212c008557dd014196b6f6204eda24b2 (patch)
tree669a20632fe2fc1127236d4bdcb923a2896d4b56 /ml/ml.cc
parenta166bbd092a304d476bd80dba37c932f4956ee3a (diff)
Use static thread-pool for training. (#14702)
* Use static thread-pool for training. * Add missing function definition * disable training stats chart * Add config option to explicitly enable ML stats charts. --------- Co-authored-by: Costa Tsaousis <costa@netdata.cloud>
Diffstat (limited to 'ml/ml.cc')
-rw-r--r--ml/ml.cc546
1 files changed, 273 insertions, 273 deletions
diff --git a/ml/ml.cc b/ml/ml.cc
index cf9ea379a6..c7d4671c04 100644
--- a/ml/ml.cc
+++ b/ml/ml.cc
@@ -8,14 +8,13 @@
#include "ad_charts.h"
-typedef struct {
- calculated_number_t *training_cns;
- calculated_number_t *scratch_training_cns;
-
- std::vector<DSample> training_samples;
-} ml_tls_data_t;
-
-static thread_local ml_tls_data_t tls_data;
+#define WORKER_TRAIN_QUEUE_POP 0
+#define WORKER_TRAIN_ACQUIRE_DIMENSION 1
+#define WORKER_TRAIN_QUERY 2
+#define WORKER_TRAIN_KMEANS 3
+#define WORKER_TRAIN_UPDATE_MODELS 4
+#define WORKER_TRAIN_RELEASE_DIMENSION 5
+#define WORKER_TRAIN_UPDATE_HOST 6
/*
* Functions to convert enums to strings
@@ -264,7 +263,14 @@ ml_queue_pop(ml_queue_t *q)
{
netdata_mutex_lock(&q->mutex);
- ml_training_request_t req = { NULL, NULL, 0, 0, 0 };
+ ml_training_request_t req = {
+ NULL, // host_id
+ NULL, // chart id
+ NULL, // dimension id
+ 0, // current time
+ 0, // first entry
+ 0 // last entry
+ };
while (q->internal.empty()) {
pthread_cond_wait(&q->cond_var, &q->mutex);
@@ -307,7 +313,7 @@ ml_queue_signal(ml_queue_t *q)
*/
static std::pair<calculated_number_t *, ml_training_response_t>
-ml_dimension_calculated_numbers(ml_dimension_t *dim, const ml_training_request_t &training_request)
+ml_dimension_calculated_numbers(ml_training_thread_t *training_thread, ml_dimension_t *dim, const ml_training_request_t &training_request)
{
ml_training_response_t training_response = {};
@@ -351,7 +357,7 @@ ml_dimension_calculated_numbers(ml_dimension_t *dim, const ml_training_request_t
STORAGE_PRIORITY_BEST_EFFORT);
size_t idx = 0;
- memset(tls_data.training_cns, 0, sizeof(calculated_number_t) * max_n * (Cfg.lag_n + 1));
+ memset(training_thread->training_cns, 0, sizeof(calculated_number_t) * max_n * (Cfg.lag_n + 1));
calculated_number_t last_value = std::numeric_limits<calculated_number_t>::quiet_NaN();
while (!ops->is_finished(&handle)) {
@@ -368,11 +374,11 @@ ml_dimension_calculated_numbers(ml_dimension_t *dim, const ml_training_request_t
training_response.db_after_t = timestamp;
training_response.db_before_t = timestamp;
- tls_data.training_cns[idx] = value;
- last_value = tls_data.training_cns[idx];
+ training_thread->training_cns[idx] = value;
+ last_value = training_thread->training_cns[idx];
training_response.collected_values++;
} else
- tls_data.training_cns[idx] = last_value;
+ training_thread->training_cns[idx] = last_value;
idx++;
}
@@ -387,20 +393,21 @@ ml_dimension_calculated_numbers(ml_dimension_t *dim, const ml_training_request_t
}
// Find first non-NaN value.
- for (idx = 0; std::isnan(tls_data.training_cns[idx]); idx++, training_response.total_values--) { }
+ for (idx = 0; std::isnan(training_thread->training_cns[idx]); idx++, training_response.total_values--) { }
// Overwrite NaN values.
if (idx != 0)
- memmove(tls_data.training_cns, &tls_data.training_cns[idx], sizeof(calculated_number_t) * training_response.total_values);
+ memmove(training_thread->training_cns, &training_thread->training_cns[idx], sizeof(calculated_number_t) * training_response.total_values);
training_response.result = TRAINING_RESULT_OK;
- return { tls_data.training_cns, training_response };
+ return { training_thread->training_cns, training_response };
}
static enum ml_training_result
-ml_dimension_train_model(ml_dimension_t *dim, const ml_training_request_t &training_request)
+ml_dimension_train_model(ml_training_thread_t *training_thread, ml_dimension_t *dim, const ml_training_request_t &training_request)
{
- auto P = ml_dimension_calculated_numbers(dim, training_request);
+ worker_is_busy(WORKER_TRAIN_QUERY);
+ auto P = ml_dimension_calculated_numbers(training_thread, dim, training_request);
ml_training_response_t training_response = P.second;
if (training_response.result != TRAINING_RESULT_OK) {
@@ -429,15 +436,16 @@ ml_dimension_train_model(ml_dimension_t *dim, const ml_training_request_t &train
}
// compute kmeans
+ worker_is_busy(WORKER_TRAIN_KMEANS);
{
- memcpy(tls_data.scratch_training_cns, tls_data.training_cns,
+ memcpy(training_thread->scratch_training_cns, training_thread->training_cns,
training_response.total_values * sizeof(calculated_number_t));
ml_features_t features = {
Cfg.diff_n, Cfg.smooth_n, Cfg.lag_n,
- tls_data.scratch_training_cns, training_response.total_values,
- tls_data.training_cns, training_response.total_values,
- tls_data.training_samples
+ training_thread->scratch_training_cns, training_response.total_values,
+ training_thread->training_cns, training_response.total_values,
+ training_thread->training_samples
};
ml_features_preprocess(&features);
@@ -446,6 +454,7 @@ ml_dimension_train_model(ml_dimension_t *dim, const ml_training_request_t &train
}
// update kmeans models
+ worker_is_busy(WORKER_TRAIN_UPDATE_MODELS);
{
netdata_mutex_lock(&dim->mutex);
@@ -497,11 +506,16 @@ ml_dimension_schedule_for_training(ml_dimension_t *dim, time_t curr_time)
}
if (schedule_for_training) {
- ml_host_t *host = (ml_host_t *) dim->rd->rrdset->rrdhost->ml_host;
ml_training_request_t req = {
- string_dup(dim->rd->rrdset->id), string_dup(dim->rd->id),
- curr_time, rrddim_first_entry_s(dim->rd), rrddim_last_entry_s(dim->rd),
+ string_dup(dim->rd->rrdset->rrdhost->hostname),
+ string_dup(dim->rd->rrdset->id),
+ string_dup(dim->rd->id),
+ curr_time,
+ rrddim_first_entry_s(dim->rd),
+ rrddim_last_entry_s(dim->rd),
};
+
+ ml_host_t *host = (ml_host_t *) dim->rd->rrdset->rrdhost->ml_host;
ml_queue_push(host->training_queue, req);
}
}
@@ -677,7 +691,6 @@ ml_host_detect_once(ml_host_t *host)
host->mls = {};
ml_machine_learning_stats_t mls_copy = {};
- ml_training_stats_t ts_copy = {};
{
netdata_mutex_lock(&host->mutex);
@@ -721,54 +734,14 @@ ml_host_detect_once(ml_host_t *host)
mls_copy = host->mls;
- /*
- * training stats
- */
- ts_copy = host->ts;
-
- host->ts.queue_size = 0;
- host->ts.num_popped_items = 0;
-
- host->ts.allotted_ut = 0;
- host->ts.consumed_ut = 0;
- host->ts.remaining_ut = 0;
-
- host->ts.training_result_ok = 0;
- host->ts.training_result_invalid_query_time_range = 0;
- host->ts.training_result_not_enough_collected_values = 0;
- host->ts.training_result_null_acquired_dimension = 0;
- host->ts.training_result_chart_under_replication = 0;
-
netdata_mutex_unlock(&host->mutex);
}
- // Calc the avg values
- if (ts_copy.num_popped_items) {
- ts_copy.queue_size /= ts_copy.num_popped_items;
- ts_copy.allotted_ut /= ts_copy.num_popped_items;
- ts_copy.consumed_ut /= ts_copy.num_popped_items;
- ts_copy.remaining_ut /= ts_copy.num_popped_items;
-
- ts_copy.training_result_ok /= ts_copy.num_popped_items;
- ts_copy.training_result_invalid_query_time_range /= ts_copy.num_popped_items;
- ts_copy.training_result_not_enough_collected_values /= ts_copy.num_popped_items;
- ts_copy.training_result_null_acquired_dimension /= ts_copy.num_popped_items;
- ts_copy.training_result_chart_under_replication /= ts_copy.num_popped_items;
- } else {
- ts_copy.queue_size = 0;
- ts_copy.allotted_ut = 0;
- ts_copy.consumed_ut = 0;
- ts_copy.remaining_ut = 0;
- }
-
worker_is_busy(WORKER_JOB_DETECTION_DIM_CHART);
ml_update_dimensions_chart(host, mls_copy);
worker_is_busy(WORKER_JOB_DETECTION_HOST_CHART);
ml_update_host_and_detection_rate_charts(host, host->host_anomaly_rate * 10000.0);
-
- worker_is_busy(WORKER_JOB_DETECTION_STATS);
- ml_update_training_statistics_chart(host, ts_copy);
}
typedef struct {
@@ -777,18 +750,21 @@ typedef struct {
} ml_acquired_dimension_t;
static ml_acquired_dimension_t
-ml_acquired_dimension_get(RRDHOST *rh, STRING *chart_id, STRING *dimension_id)
+ml_acquired_dimension_get(STRING *host_id, STRING *chart_id, STRING *dimension_id)
{
RRDDIM_ACQUIRED *acq_rd = NULL;
ml_dimension_t *dim = NULL;
- RRDSET *rs = rrdset_find(rh, string2str(chart_id));
- if (rs) {
- acq_rd = rrddim_find_and_acquire(rs, string2str(dimension_id));
- if (acq_rd) {
- RRDDIM *rd = rrddim_acquired_to_rrddim(acq_rd);
- if (rd)
- dim = (ml_dimension_t *) rd->ml_dimension;
+ RRDHOST *rh = rrdhost_find_by_hostname(string2str(host_id));
+ if (rh) {
+ RRDSET *rs = rrdset_find(rh, string2str(chart_id));
+ if (rs) {
+ acq_rd = rrddim_find_and_acquire(rs, string2str(dimension_id));
+ if (acq_rd) {
+ RRDDIM *rd = rrddim_acquired_to_rrddim(acq_rd);
+ if (rd)
+ dim = (ml_dimension_t *) rd->ml_dimension;
+ }
}
}
@@ -809,110 +785,12 @@ ml_acquired_dimension_release(ml_acquired_dimension_t acq_dim)
}
static enum ml_training_result
-ml_acquired_dimension_train(ml_acquired_dimension_t acq_dim, const ml_training_request_t &TR)
+ml_acquired_dimension_train(ml_training_thread_t *training_thread, ml_acquired_dimension_t acq_dim, const ml_training_request_t &tr)
{
if (!acq_dim.dim)
return TRAINING_RESULT_NULL_ACQUIRED_DIMENSION;
- return ml_dimension_train_model(acq_dim.dim, TR);
-}
-
-#define WORKER_JOB_TRAINING_FIND 0
-#define WORKER_JOB_TRAINING_TRAIN 1
-#define WORKER_JOB_TRAINING_STATS 2
-
-static void
-ml_host_train(ml_host_t *host)
-{
- worker_register("MLTRAIN");
- worker_register_job_name(WORKER_JOB_TRAINING_FIND, "find");
- worker_register_job_name(WORKER_JOB_TRAINING_TRAIN, "train");
- worker_register_job_name(WORKER_JOB_TRAINING_STATS, "stats");
-
- service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, (force_quit_t ) ml_host_cancel_training_thread, host->rh, true);
-
- while (service_running(SERVICE_ML_TRAINING)) {
- ml_training_request_t training_req = ml_queue_pop(host->training_queue);
- size_t queue_size = ml_queue_size(host->training_queue) + 1;
-
- if (host->threads_cancelled) {
- info("Stopping training thread for host %s because it was cancelled", rrdhost_hostname(host->rh));
- break;
- }
-
- usec_t allotted_ut = (Cfg.train_every * host->rh->rrd_update_every * USEC_PER_SEC) / queue_size;
- if (allotted_ut > USEC_PER_SEC)
- allotted_ut = USEC_PER_SEC;
-
- usec_t start_ut = now_monotonic_usec();
- enum ml_training_result training_res;
- {
- worker_is_busy(WORKER_JOB_TRAINING_FIND);
- ml_acquired_dimension_t acq_dim = ml_acquired_dimension_get(host->rh, training_req.chart_id, training_req.dimension_id);
-
- worker_is_busy(WORKER_JOB_TRAINING_TRAIN);
- training_res = ml_acquired_dimension_train(acq_dim, training_req);
-
- string_freez(training_req.chart_id);
- string_freez(training_req.dimension_id);
-
- ml_acquired_dimension_release(acq_dim);
- }
- usec_t consumed_ut = now_monotonic_usec() - start_ut;
-
- worker_is_busy(WORKER_JOB_TRAINING_STATS);
-
- usec_t remaining_ut = 0;
- if (consumed_ut < allotted_ut)
- remaining_ut = allotted_ut - consumed_ut;
-
- {
- netdata_mutex_lock(&host->mutex);
-
- host->ts.queue_size += queue_size;
- host->ts.num_popped_items += 1;
-
- host->ts.allotted_ut += allotted_ut;
- host->ts.consumed_ut += consumed_ut;
- host->ts.remaining_ut += remaining_ut;
-
- switch (training_res) {
- case TRAINING_RESULT_OK:
- host->ts.training_result_ok += 1;
- break;
- case TRAINING_RESULT_INVALID_QUERY_TIME_RANGE:
- host->ts.training_result_invalid_query_time_range += 1;
- break;
- case TRAINING_RESULT_NOT_ENOUGH_COLLECTED_VALUES:
- host->ts.training_result_not_enough_collected_values += 1;
- break;
- case TRAINING_RESULT_NULL_ACQUIRED_DIMENSION:
- host->ts.training_result_null_acquired_dimension += 1;
- break;
- case TRAINING_RESULT_CHART_UNDER_REPLICATION:
- host->ts.training_result_chart_under_replication += 1;
- break;
- }
-
- netdata_mutex_unlock(&host->mutex);
- }
-
- worker_is_idle();
- std::this_thread::sleep_for(std::chrono::microseconds{remaining_ut});
- worker_is_busy(0);
- }
-}
-
-static void *
-train_main(void *arg)
-{
- size_t max_elements_needed_for_training = Cfg.max_train_samples * (Cfg.lag_n + 1);
- tls_data.training_cns = new calculated_number_t[max_elements_needed_for_training]();
- tls_data.scratch_training_cns = new calculated_number_t[max_elements_needed_for_training]();
-
- ml_host_t *host = (ml_host_t *) arg;
- ml_host_train(host);
- return NULL;
+ return ml_dimension_train_model(training_thread, acq_dim.dim, tr);
}
static void *
@@ -926,12 +804,10 @@ ml_detect_main(void *arg)
worker_register_job_name(WORKER_JOB_DETECTION_HOST_CHART, "host chart");
worker_register_job_name(WORKER_JOB_DETECTION_STATS, "training stats");
- service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, NULL, NULL, true);
-
heartbeat_t hb;
heartbeat_init(&hb);
- while (service_running((SERVICE_TYPE)(SERVICE_ML_PREDICTION | SERVICE_COLLECTORS))) {
+ while (!Cfg.detection_stop) {
worker_is_idle();
heartbeat_next(&hb, USEC_PER_SEC);
@@ -945,6 +821,39 @@ ml_detect_main(void *arg)
ml_host_detect_once((ml_host_t *) rh->ml_host);
}
dfe_done(rhp);
+
+ if (Cfg.enable_statistics_charts) {
+ // collect and update training thread stats
+ for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) {
+ ml_training_thread_t *training_thread = &Cfg.training_threads[idx];
+
+ netdata_mutex_lock(&training_thread->nd_mutex);
+ ml_training_stats_t training_stats = training_thread->training_stats;
+ training_thread->training_stats = {};
+ netdata_mutex_unlock(&training_thread->nd_mutex);
+
+ // calc the avg values
+ if (training_stats.num_popped_items) {
+ training_stats.queue_size /= training_stats.num_popped_items;
+ training_stats.allotted_ut /= training_stats.num_popped_items;
+ training_stats.consumed_ut /= training_stats.num_popped_items;
+ training_stats.remaining_ut /= training_stats.num_popped_items;
+ } else {
+ training_stats.queue_size = 0;
+ training_stats.allotted_ut = 0;
+ training_stats.consumed_ut = 0;
+ training_stats.remaining_ut = 0;
+
+ training_stats.training_result_ok = 0;
+ training_stats.training_result_invalid_query_time_range = 0;
+ training_stats.training_result_not_enough_collected_values = 0;
+ training_stats.training_result_null_acquired_dimension = 0;
+ training_stats.training_result_chart_under_replication = 0;
+ }
+
+ ml_update_training_statistics_chart(training_thread, training_stats);
+ }
+ }
}
return NULL;
@@ -978,31 +887,6 @@ bool ml_streaming_enabled()
return Cfg.stream_anomaly_detection_charts;
}
-void ml_init()
-{
- // Read config values
- ml_config_load(&Cfg);
-
- if (!Cfg.enable_anomaly_detection)
- return;
-
- // Generate random numbers to efficiently sample the features we need
- // for KMeans clustering.
- std::random_device RD;
- std::mt19937 Gen(RD());
-
- Cfg.random_nums.reserve(Cfg.max_train_samples);
- for (size_t Idx = 0; Idx != Cfg.max_train_samples; Idx++)
- Cfg.random_nums.push_back(Gen());
-
-
- // start detection & training threads
- char tag[NETDATA_THREAD_TAG_MAX + 1];
-
- snprintfz(tag, NETDATA_THREAD_TAG_MAX, "%s", "PREDICT");
- netdata_thread_create(&Cfg.detection_thread, tag, NETDATA_THREAD_OPTION_JOINABLE, ml_detect_main, NULL);
-}
-
void ml_host_new(RRDHOST *rh)
{
if (!ml_enabled(rh))
@@ -1012,14 +896,12 @@ void ml_host_new(RRDHOST *rh)
host->rh = rh;
host->mls = ml_machine_learning_stats_t();
- host->ts = ml_training_stats_t();
+ //host->ts = ml_training_stats_t();
- host->host_anomaly_rate = 0.0;
- host->threads_running = false;
- host->threads_cancelled = false;
- host->threads_joined = false;
+ static std::atomic<size_t> times_called(0);
+ host->training_queue = Cfg.training_threads[times_called++ % Cfg.num_training_threads].training_queue;
- host->training_queue = ml_queue_init();
+ host->host_anomaly_rate = 0.0;
netdata_mutex_init(&host->mutex);
@@ -1033,7 +915,6 @@ void ml_host_delete(RRDHOST *rh)
return;
netdata_mutex_destroy(&host->mutex);
- ml_queue_destroy(host->training_queue);
delete host;
rh->ml_host = NULL;
@@ -1100,69 +981,6 @@ void ml_host_get_models(RRDHOST *rh, BUFFER *wb)
error("Fetching KMeans models is not supported yet");
}
-void ml_host_start_training_thread(RRDHOST *rh)
-{
- if (!rh || !rh->ml_host)
- return;
-
- ml_host_t *host = (ml_host_t *) rh->ml_host;
-
- if (host->threads_running) {
- error("Anomaly detections threads for host %s are already-up and running.", rrdhost_hostname(host->rh));
- return;
- }
-
- host->threads_running = true;
- host->threads_cancelled = false;
- host->threads_joined = false;
-
- char tag[NETDATA_THREAD_TAG_MAX + 1];
-
- snprintfz(tag, NETDATA_THREAD_TAG_MAX, "MLTR[%s]", rrdhost_hostname(host->rh));
- netdata_thread_create(&host->training_thread, tag, NETDATA_THREAD_OPTION_JOINABLE, train_main, static_cast<void *>(host));
-}
-
-void ml_host_cancel_training_thread(RRDHOST *rh)
-{
- if (!rh || !rh->ml_host)
- return;
-
- ml_host_t *host = (ml_host_t *) rh->ml_host;
-
- if (!host->threads_running) {
- error("Anomaly detections threads for host %s have already been stopped.", rrdhost_hostname(host->rh));
- return;
- }
-
- if (!host->threads_cancelled) {
- host->threads_cancelled = true;
-
- // Signal the training queue to stop popping-items
- ml_queue_signal(host->training_queue);
- netdata_thread_cancel(host->training_thread);
- }
-}
-
-void ml_host_stop_training_thread(RRDHOST *rh)
-{
- if (!rh || !rh->ml_host)
- return;
-
- ml_host_cancel_training_thread(rh);
-
- ml_host_t *host = (ml_host_t *) rh->ml_host;
-
- if (!host->threads_joined) {
- host->threads_joined = true;
- host->threads_running = false;
-
- delete[] tls_data.training_cns;
- delete[] tls_data.scratch_training_cns;
-
- netdata_thread_join(host->training_thread, NULL);
- }
-}
-
void ml_chart_new(RRDSET *rs)
{
ml_host_t *host = (ml_host_t *) rs->rrdhost->ml_host;
@@ -1267,3 +1085,185 @@ bool ml_dimension_is_anomalous(RRDDIM *rd, time_t curr_time, double value, bool
return is_anomalous;
}
+
+static void *ml_train_main(void *arg) {
+ ml_training_thread_t *training_thread = (ml_training_thread_t *) arg;
+
+ char worker_name[1024];
+ snprintfz(worker_name, 1024, "training_thread_%zu", training_thread->id);
+ worker_register("MLTRAIN");
+
+ worker_register_job_name(WORKER_TRAIN_QUEUE_POP, "pop queue");
+ worker_register_job_name(WORKER_TRAIN_ACQUIRE_DIMENSION, "acquire");
+ worker_register_job_name(WORKER_TRAIN_QUERY, "query");
+ worker_register_job_name(WORKER_TRAIN_KMEANS, "kmeans");
+ worker_register_job_name(WORKER_TRAIN_UPDATE_MODELS, "update models");
+ worker_register_job_name(WORKER_TRAIN_RELEASE_DIMENSION, "release");
+ worker_register_job_name(WORKER_TRAIN_UPDATE_HOST, "update host");
+
+ while (!Cfg.training_stop) {
+ worker_is_busy(WORKER_TRAIN_QUEUE_POP);
+
+ ml_training_request_t training_req = ml_queue_pop(training_thread->training_queue);
+
+ // we know this thread has been cancelled, when the queue starts
+ // returning "null" requests without blocking on queue's pop().
+ if (training_req.host_id == NULL)
+ break;
+
+ size_t queue_size = ml_queue_size(training_thread->training_queue) + 1;
+
+ usec_t allotted_ut = (Cfg.train_every * USEC_PER_SEC) / queue_size;
+ if (allotted_ut > USEC_PER_SEC)
+ allotted_ut = USEC_PER_SEC;
+
+ usec_t start_ut = now_monotonic_usec();
+
+ enum ml_training_result training_res;
+ {
+ worker_is_busy(WORKER_TRAIN_ACQUIRE_DIMENSION);
+ ml_acquired_dimension_t acq_dim = ml_acquired_dimension_get(
+ training_req.host_id,
+ training_req.chart_id,
+ training_req.dimension_id);
+
+ training_res = ml_acquired_dimension_train(training_thread, acq_dim, training_req);
+
+ string_freez(training_req.host_id);
+ string_freez(training_req.chart_id);
+ string_freez(training_req.dimension_id);
+
+ worker_is_busy(WORKER_TRAIN_RELEASE_DIMENSION);
+ ml_acquired_dimension_release(acq_dim);
+ }
+
+ usec_t consumed_ut = now_monotonic_usec() - start_ut;
+
+ usec_t remaining_ut = 0;
+ if (consumed_ut < allotted_ut)
+ remaining_ut = allotted_ut - consumed_ut;
+
+ if (Cfg.enable_statistics_charts) {
+ worker_is_busy(WORKER_TRAIN_UPDATE_HOST);
+
+ netdata_mutex_lock(&training_thread->nd_mutex);
+
+ training_thread->training_stats.queue_size += queue_size;
+ training_thread->training_stats.num_popped_items += 1;
+
+ training_thread->training_stats.allotted_ut += allotted_ut;
+ training_thread->training_stats.consumed_ut += consumed_ut;
+ training_thread->training_stats.remaining_ut += remaining_ut;
+
+ switch (training_res) {
+ case TRAINING_RESULT_OK:
+ training_thread->training_stats.training_result_ok += 1;
+ break;
+ case TRAINING_RESULT_INVALID_QUERY_TIME_RANGE:
+ training_thread->training_stats.training_result_invalid_query_time_range += 1;
+ break;
+ case TRAINING_RESULT_NOT_ENOUGH_COLLECTED_VALUES:
+ training_thread->training_stats.training_result_not_enough_collected_values += 1;
+ break;
+ case TRAINING_RESULT_NULL_ACQUIRED_DIMENSION:
+ training_thread->training_stats.training_result_null_acquired_dimension += 1;
+ break;
+ case TRAINING_RESULT_CHART_UNDER_REPLICATION:
+ training_thread->training_stats.training_result_chart_under_replication += 1;
+ break;
+ }
+
+ netdata_mutex_unlock(&training_thread->nd_mutex);
+ }
+
+ worker_is_idle();
+ std::this_thread::sleep_for(std::chrono::microseconds{remaining_ut});
+ }
+
+ return NULL;
+}
+
+void ml_init()
+{
+ // Read config values
+ ml_config_load(&Cfg);
+
+ if (!Cfg.enable_anomaly_detection)
+ return;
+
+ // Generate random numbers to efficiently sample the features we need
+ // for KMeans clustering.
+ std::random_device RD;
+ std::mt19937 Gen(RD());
+
+ Cfg.random_nums.reserve(Cfg.max_train_samples);
+ for (size_t Idx = 0; Idx != Cfg.max_train_samples; Idx++)
+ Cfg.random_nums.push_back(Gen());
+
+
+ // start detection & training threads
+ Cfg.detection_stop = false;
+ Cfg.training_stop = false;
+
+ char tag[NETDATA_THREAD_TAG_MAX + 1];
+
+ snprintfz(tag, NETDATA_THREAD_TAG_MAX, "%s", "PREDICT");
+ netdata_thread_create(&Cfg.detection_thread, tag, NETDATA_THREAD_OPTION_JOINABLE, ml_detect_main, NULL);
+
+ Cfg.training_threads.resize(Cfg.num_training_threads);
+ for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) {
+ ml_training_thread_t *training_thread = &Cfg.training_threads[idx];
+
+
+ size_t max_elements_needed_for_training = Cfg.max_train_samples * (Cfg.lag_n + 1);
+ training_thread->training_cns = new calculated_number_t[max_elements_needed_for_training]();
+ training_thread->scratch_training_cns = new calculated_number_t[max_elements_needed_for_training]();
+
+ training_thread->id = idx;
+ training_thread->training_queue = ml_queue_init();
+ netdata_mutex_init(&training_thread->nd_mutex);
+
+ snprintfz(tag, NETDATA_THREAD_TAG_MAX, "TRAIN[%zu]", training_thread->id);
+ netdata_thread_create(&training_thread->nd_thread, tag, NETDATA_THREAD_OPTION_JOINABLE, ml_train_main, training_thread);
+ }
+}
+
+void ml_fini()
+{
+ Cfg.detection_stop = true;
+ Cfg.training_stop = true;
+
+ netdata_thread_cancel(Cfg.detection_thread);
+ netdata_thread_join(Cfg.detection_thread, NULL);
+
+ // signal the training queue of each thread
+ for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) {
+ ml_training_thread_t *training_thread = &Cfg.training_threads[idx];
+
+ ml_queue_signal(training_thread->training_queue);
+ }
+
+ // cancel training threads
+ for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) {
+ ml_training_thread_t *training_thread = &Cfg.training_threads[idx];
+
+ netdata_thread_cancel(training_thread->nd_thread);
+ }
+
+ // join training threads
+ for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) {
+ ml_training_thread_t *training_thread = &Cfg.training_threads[idx];
+
+ netdata_thread_join(training_thread->nd_thread, NULL);
+ }
+
+ // clear training thread data
+ for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) {
+ ml_training_thread_t *training_thread = &Cfg.training_threads[idx];
+
+ delete[] training_thread->training_cns;
+ delete[] training_thread->scratch_training_cns;
+ ml_queue_destroy(training_thread->training_queue);
+ netdata_mutex_destroy(&training_thread->nd_mutex);
+ }
+}