summaryrefslogtreecommitdiffstats
path: root/ml/ml.cc
diff options
context:
space:
mode:
authorvkalintiris <vasilis@netdata.cloud>2023-04-20 11:24:24 +0300
committerGitHub <noreply@github.com>2023-04-20 11:24:24 +0300
commitba2a5e7857435cdcd217d5b7d4f78bd51ae6a6d1 (patch)
treea313e00fcd560f7bc886906d256b9d5792141848 /ml/ml.cc
parentca01da89dc6a061bf6ad2b315f88ba70b0456daf (diff)
Skip ML initialization when it's been disabled in netdata.conf (#14920)
* Apply ML changes again. The ML changes in 003df5f2 wheere reverted with 556bdad9 because we were partially initializing ML even when it was explicitly disabled in netdata.conf, causing the agent to crash on startup. * Do not start/stop ML threads when ML is disabled. * Restore default config settings.
Diffstat (limited to 'ml/ml.cc')
-rw-r--r--ml/ml.cc901
1 files changed, 608 insertions, 293 deletions
diff --git a/ml/ml.cc b/ml/ml.cc
index b5cf6d661d..405c03928d 100644
--- a/ml/ml.cc
+++ b/ml/ml.cc
@@ -7,15 +7,18 @@
#include <random>
#include "ad_charts.h"
+#include "database/sqlite/sqlite3.h"
-typedef struct {
- calculated_number_t *training_cns;
- calculated_number_t *scratch_training_cns;
-
- std::vector<DSample> training_samples;
-} ml_tls_data_t;
+#define WORKER_TRAIN_QUEUE_POP 0
+#define WORKER_TRAIN_ACQUIRE_DIMENSION 1
+#define WORKER_TRAIN_QUERY 2
+#define WORKER_TRAIN_KMEANS 3
+#define WORKER_TRAIN_UPDATE_MODELS 4
+#define WORKER_TRAIN_RELEASE_DIMENSION 5
+#define WORKER_TRAIN_UPDATE_HOST 6
+#define WORKER_TRAIN_LOAD_MODELS 7
-static thread_local ml_tls_data_t tls_data;
+static sqlite3 *db = NULL;
/*
* Functions to convert enums to strings
@@ -173,26 +176,26 @@ ml_features_preprocess(ml_features_t *features)
*/
static void
-ml_kmeans_init(ml_kmeans_t *kmeans, size_t num_clusters, size_t max_iterations)
+ml_kmeans_init(ml_kmeans_t *kmeans)
{
- kmeans->num_clusters = num_clusters;
- kmeans->max_iterations = max_iterations;
-
- kmeans->cluster_centers.reserve(kmeans->num_clusters);
+ kmeans->cluster_centers.reserve(2);
kmeans->min_dist = std::numeric_limits<calculated_number_t>::max();
kmeans->max_dist = std::numeric_limits<calculated_number_t>::min();
}
static void
-ml_kmeans_train(ml_kmeans_t *kmeans, const ml_features_t *features)
+ml_kmeans_train(ml_kmeans_t *kmeans, const ml_features_t *features, time_t after, time_t before)
{
+ kmeans->after = (uint32_t) after;
+ kmeans->before = (uint32_t) before;
+
kmeans->min_dist = std::numeric_limits<calculated_number_t>::max();
kmeans->max_dist = std::numeric_limits<calculated_number_t>::min();
kmeans->cluster_centers.clear();
- dlib::pick_initial_centers(kmeans->num_clusters, kmeans->cluster_centers, features->preprocessed_features);
- dlib::find_clusters_using_kmeans(features->preprocessed_features, kmeans->cluster_centers, kmeans->max_iterations);
+ dlib::pick_initial_centers(2, kmeans->cluster_centers, features->preprocessed_features);
+ dlib::find_clusters_using_kmeans(features->preprocessed_features, kmeans->cluster_centers, Cfg.max_kmeans_iters);
for (const auto &preprocessed_feature : features->preprocessed_features) {
calculated_number_t mean_dist = 0.0;
@@ -201,7 +204,7 @@ ml_kmeans_train(ml_kmeans_t *kmeans, const ml_features_t *features)
mean_dist += dlib::length(cluster_center - preprocessed_feature);
}
- mean_dist /= kmeans->num_clusters;
+ mean_dist /= kmeans->cluster_centers.size();
if (mean_dist < kmeans->min_dist)
kmeans->min_dist = mean_dist;
@@ -218,7 +221,7 @@ ml_kmeans_anomaly_score(const ml_kmeans_t *kmeans, const DSample &DS)
for (const auto &CC: kmeans->cluster_centers)
mean_dist += dlib::length(CC - DS);
- mean_dist /= kmeans->num_clusters;
+ mean_dist /= kmeans->cluster_centers.size();
if (kmeans->max_dist == kmeans->min_dist)
return 0.0;
@@ -264,7 +267,14 @@ ml_queue_pop(ml_queue_t *q)
{
netdata_mutex_lock(&q->mutex);
- ml_training_request_t req = { NULL, NULL, 0, 0, 0 };
+ ml_training_request_t req = {
+ NULL, // host_id
+ NULL, // chart id
+ NULL, // dimension id
+ 0, // current time
+ 0, // first entry
+ 0 // last entry
+ };
while (q->internal.empty()) {
pthread_cond_wait(&q->cond_var, &q->mutex);
@@ -307,7 +317,7 @@ ml_queue_signal(ml_queue_t *q)
*/
static std::pair<calculated_number_t *, ml_training_response_t>
-ml_dimension_calculated_numbers(ml_dimension_t *dim, const ml_training_request_t &training_request)
+ml_dimension_calculated_numbers(ml_training_thread_t *training_thread, ml_dimension_t *dim, const ml_training_request_t &training_request)
{
ml_training_response_t training_response = {};
@@ -348,7 +358,7 @@ ml_dimension_calculated_numbers(ml_dimension_t *dim, const ml_training_request_t
STORAGE_PRIORITY_BEST_EFFORT);
size_t idx = 0;
- memset(tls_data.training_cns, 0, sizeof(calculated_number_t) * max_n * (Cfg.lag_n + 1));
+ memset(training_thread->training_cns, 0, sizeof(calculated_number_t) * max_n * (Cfg.lag_n + 1));
calculated_number_t last_value = std::numeric_limits<calculated_number_t>::quiet_NaN();
while (!storage_engine_query_is_finished(&handle)) {
@@ -365,11 +375,11 @@ ml_dimension_calculated_numbers(ml_dimension_t *dim, const ml_training_request_t
training_response.db_after_t = timestamp;
training_response.db_before_t = timestamp;
- tls_data.training_cns[idx] = value;
- last_value = tls_data.training_cns[idx];
+ training_thread->training_cns[idx] = value;
+ last_value = training_thread->training_cns[idx];
training_response.collected_values++;
} else
- tls_data.training_cns[idx] = last_value;
+ training_thread->training_cns[idx] = last_value;
idx++;
}
@@ -384,20 +394,270 @@ ml_dimension_calculated_numbers(ml_dimension_t *dim, const ml_training_request_t
}
// Find first non-NaN value.
- for (idx = 0; std::isnan(tls_data.training_cns[idx]); idx++, training_response.total_values--) { }
+ for (idx = 0; std::isnan(training_thread->training_cns[idx]); idx++, training_response.total_values--) { }
// Overwrite NaN values.
if (idx != 0)
- memmove(tls_data.training_cns, &tls_data.training_cns[idx], sizeof(calculated_number_t) * training_response.total_values);
+ memmove(training_thread->training_cns, &training_thread->training_cns[idx], sizeof(calculated_number_t) * training_response.total_values);
training_response.result = TRAINING_RESULT_OK;
- return { tls_data.training_cns, training_response };
+ return { training_thread->training_cns, training_response };
+}
+
+const char *db_models_create_table =
+ "CREATE TABLE IF NOT EXISTS models("
+ " dim_id BLOB, dim_str TEXT, after INT, before INT,"
+ " min_dist REAL, max_dist REAL,"
+ " c00 REAL, c01 REAL, c02 REAL, c03 REAL, c04 REAL, c05 REAL,"
+ " c10 REAL, c11 REAL, c12 REAL, c13 REAL, c14 REAL, c15 REAL,"
+ " PRIMARY KEY(dim_id, after)"
+ ");";
+
+const char *db_models_add_model =
+ "INSERT OR REPLACE INTO models("
+ " dim_id, dim_str, after, before,"
+ " min_dist, max_dist,"
+ " c00, c01, c02, c03, c04, c05,"
+ " c10, c11, c12, c13, c14, c15)"
+ "VALUES("
+ " @dim_id, @dim_str, @after, @before,"
+ " @min_dist, @max_dist,"
+ " @c00, @c01, @c02, @c03, @c04, @c05,"
+ " @c10, @c11, @c12, @c13, @c14, @c15);";
+
+const char *db_models_load =
+ "SELECT * FROM models "
+ "WHERE dim_id == @dim_id AND after >= @after ORDER BY before ASC;";
+
+const char *db_models_delete =
+ "DELETE FROM models "
+ "WHERE dim_id = @dim_id AND before < @before;";
+
+static int
+ml_dimension_add_model(ml_dimension_t *dim)
+{
+ static __thread sqlite3_stmt *res = NULL;
+ int param = 0;
+ int rc = 0;
+
+ if (unlikely(!db)) {
+ error_report("Database has not been initialized");
+ return 1;
+ }
+
+ if (unlikely(!res)) {
+ rc = prepare_statement(db, db_models_add_model, &res);
+ if (unlikely(rc != SQLITE_OK)) {
+ error_report("Failed to prepare statement to store model, rc = %d", rc);
+ return 1;
+ }
+ }
+
+ rc = sqlite3_bind_blob(res, ++param, &dim->rd->metric_uuid, sizeof(dim->rd->metric_uuid), SQLITE_STATIC);
+ if (unlikely(rc != SQLITE_OK))
+ goto bind_fail;
+
+ char id[1024];
+ snprintfz(id, 1024 - 1, "%s.%s", rrdset_id(dim->rd->rrdset), rrddim_id(dim->rd));
+ rc = sqlite3_bind_text(res, ++param, id, -1, SQLITE_STATIC);
+ if (unlikely(rc != SQLITE_OK))
+ goto bind_fail;
+
+ rc = sqlite3_bind_int(res, ++param, (int) dim->kmeans.after);
+ if (unlikely(rc != SQLITE_OK))
+ goto bind_fail;
+
+ rc = sqlite3_bind_int(res, ++param, (int) dim->kmeans.before);
+ if (unlikely(rc != SQLITE_OK))
+ goto bind_fail;
+
+ rc = sqlite3_bind_double(res, ++param, dim->kmeans.min_dist);
+ if (unlikely(rc != SQLITE_OK))
+ goto bind_fail;
+
+ rc = sqlite3_bind_double(res, ++param, dim->kmeans.max_dist);
+ if (unlikely(rc != SQLITE_OK))
+ goto bind_fail;
+
+ if (dim->kmeans.cluster_centers.size() != 2)
+ fatal("Expected 2 cluster centers, got %zu", dim->kmeans.cluster_centers.size());
+
+ for (const DSample &ds : dim->kmeans.cluster_centers) {
+ if (ds.size() != 6)
+ fatal("Expected dsample with 6 dimensions, got %ld", ds.size());
+
+ for (long idx = 0; idx != ds.size(); idx++) {
+ calculated_number_t cn = ds(idx);
+ int rc = sqlite3_bind_double(res, ++param, cn);
+ if (unlikely(rc != SQLITE_OK))
+ goto bind_fail;
+ }
+ }
+
+ rc = execute_insert(res);
+ if (unlikely(rc != SQLITE_DONE))
+ error_report("Failed to store model, rc = %d", rc);
+
+ rc = sqlite3_reset(res);
+ if (unlikely(rc != SQLITE_OK))
+ error_report("Failed to reset statement when storing model, rc = %d", rc);
+
+ return 0;
+
+bind_fail:
+ error_report("Failed to bind parameter %d to store model, rc = %d", param, rc);
+ rc = sqlite3_reset(res);
+ if (unlikely(rc != SQLITE_OK))
+ error_report("Failed to reset statement to store model, rc = %d", rc);
+ return 1;
+}
+
+static int
+ml_dimension_delete_models(ml_dimension_t *dim)
+{
+ static __thread sqlite3_stmt *res = NULL;
+ int rc = 0;
+ int param = 0;
+
+ if (unlikely(!db)) {
+ error_report("Database has not been initialized");
+ return 1;
+ }
+
+ if (unlikely(!res)) {
+ rc = prepare_statement(db, db_models_delete, &res);
+ if (unlikely(rc != SQLITE_OK)) {
+ error_report("Failed to prepare statement to delete models, rc = %d", rc);
+ return 1;
+ }
+ }
+
+ rc = sqlite3_bind_blob(res, ++param, &dim->rd->metric_uuid, sizeof(dim->rd->metric_uuid), SQLITE_STATIC);
+ if (unlikely(rc != SQLITE_OK))
+ goto bind_fail;
+
+ rc = sqlite3_bind_int(res, ++param, (int) dim->kmeans.before - (Cfg.num_models_to_use * Cfg.train_every));
+ if (unlikely(rc != SQLITE_OK))
+ goto bind_fail;
+
+ rc = execute_insert(res);
+ if (unlikely(rc != SQLITE_DONE))
+ error_report("Failed to delete models, rc = %d", rc);
+
+ rc = sqlite3_reset(res);
+ if (unlikely(rc != SQLITE_OK))
+ error_report("Failed to reset statement when deleting models, rc = %d", rc);
+
+ return 0;
+
+bind_fail:
+ error_report("Failed to bind parameter %d to delete models, rc = %d", param, rc);
+ rc = sqlite3_reset(res);
+ if (unlikely(rc != SQLITE_OK))
+ error_report("Failed to reset statement to delete models, rc = %d", rc);
+ return 1;
+}
+
+static int
+ml_dimension_load_models(ml_dimension_t *dim) {
+ std::vector<ml_kmeans_t> V;
+
+ static __thread sqlite3_stmt *res = NULL;
+ int rc = 0;
+ int param = 0;
+
+ if (unlikely(!db)) {
+ error_report("Database has not been initialized");
+ return 1;
+ }
+
+ if (unlikely(!res)) {
+ rc = prepare_statement(db, db_models_load, &res);
+ if (unlikely(rc != SQLITE_OK)) {
+ error_report("Failed to prepare statement to load models, rc = %d", rc);
+ return 1;
+ }
+ }
+
+ rc = sqlite3_bind_blob(res, ++param, &dim->rd->metric_uuid, sizeof(dim->rd->metric_uuid), SQLITE_STATIC);
+ if (unlikely(rc != SQLITE_OK))
+ goto bind_fail;
+
+ rc = sqlite3_bind_int(res, ++param, now_realtime_usec() - (Cfg.num_models_to_use * Cfg.max_train_samples));
+ if (unlikely(rc != SQLITE_OK))
+ goto bind_fail;
+
+ dim->km_contexts.reserve(Cfg.num_models_to_use);
+ while ((rc = sqlite3_step_monitored(res)) == SQLITE_ROW) {
+ ml_kmeans_t km;
+
+ km.after = sqlite3_column_int(res, 2);
+ km.before = sqlite3_column_int(res, 3);
+
+ km.min_dist = sqlite3_column_int(res, 4);
+ km.max_dist = sqlite3_column_int(res, 5);
+
+ km.cluster_centers.resize(2);
+
+ km.cluster_centers[0].set_size(Cfg.lag_n + 1);
+ km.cluster_centers[0](0) = sqlite3_column_double(res, 6);
+ km.cluster_centers[0](1) = sqlite3_column_double(res, 7);
+ km.cluster_centers[0](2) = sqlite3_column_double(res, 8);
+ km.cluster_centers[0](3) = sqlite3_column_double(res, 9);
+ km.cluster_centers[0](4) = sqlite3_column_double(res, 10);
+ km.cluster_centers[0](5) = sqlite3_column_double(res, 11);
+
+ km.cluster_centers[1].set_size(Cfg.lag_n + 1);
+ km.cluster_centers[1](0) = sqlite3_column_double(res, 12);
+ km.cluster_centers[1](1) = sqlite3_column_double(res, 13);
+ km.cluster_centers[1](2) = sqlite3_column_double(res, 14);
+ km.cluster_centers[1](3) = sqlite3_column_double(res, 15);
+ km.cluster_centers[1](4) = sqlite3_column_double(res, 16);
+ km.cluster_centers[1](5) = sqlite3_column_double(res, 17);
+
+ dim->km_contexts.push_back(km);
+ }
+
+ if (unlikely(rc != SQLITE_DONE))
+ error_report("Failed to load models, rc = %d", rc);
+
+ rc = sqlite3_reset(res);
+ if (unlikely(rc != SQLITE_OK))
+ error_report("Failed to reset statement when loading models, rc = %d", rc);
+
+ return 0;
+
+bind_fail:
+ error_report("Failed to bind parameter %d to load models, rc = %d", param, rc);
+ rc = sqlite3_reset(res);
+ if (unlikely(rc != SQLITE_OK))
+ error_report("Failed to reset statement to load models, rc = %d", rc);
+ return 1;
+}
+
+static int
+ml_dimension_update_models(ml_dimension_t *dim)
+{
+ int rc;
+
+ if (dim->km_contexts.empty()) {
+ rc = ml_dimension_load_models(dim);
+ if (rc)
+ return rc;
+ }
+
+ rc = ml_dimension_add_model(dim);
+ if (rc)
+ return rc;
+
+ return ml_dimension_delete_models(dim);
}
static enum ml_training_result
-ml_dimension_train_model(ml_dimension_t *dim, const ml_training_request_t &training_request)
+ml_dimension_train_model(ml_training_thread_t *training_thread, ml_dimension_t *dim, const ml_training_request_t &training_request)
{
- auto P = ml_dimension_calculated_numbers(dim, training_request);
+ worker_is_busy(WORKER_TRAIN_QUERY);
+ auto P = ml_dimension_calculated_numbers(training_thread, dim, training_request);
ml_training_response_t training_response = P.second;
if (training_response.result != TRAINING_RESULT_OK) {
@@ -426,31 +686,56 @@ ml_dimension_train_model(ml_dimension_t *dim, const ml_training_request_t &train
}
// compute kmeans
+ worker_is_busy(WORKER_TRAIN_KMEANS);
{
- memcpy(tls_data.scratch_training_cns, tls_data.training_cns,
+ memcpy(training_thread->scratch_training_cns, training_thread->training_cns,
training_response.total_values * sizeof(calculated_number_t));
ml_features_t features = {
Cfg.diff_n, Cfg.smooth_n, Cfg.lag_n,
- tls_data.scratch_training_cns, training_response.total_values,
- tls_data.training_cns, training_response.total_values,
- tls_data.training_samples
+ training_thread->scratch_training_cns, training_response.total_values,
+ training_thread->training_cns, training_response.total_values,
+ training_thread->training_samples
};
ml_features_preprocess(&features);
- ml_kmeans_init(&dim->kmeans, 2, 1000);
- ml_kmeans_train(&dim->kmeans, &features);
+ ml_kmeans_init(&dim->kmeans);
+ ml_kmeans_train(&dim->kmeans, &features, training_response.query_after_t, training_response.query_before_t);
}
- // update kmeans models
+ // update models
{
netdata_mutex_lock(&dim->mutex);
+ worker_is_busy(WORKER_TRAIN_LOAD_MODELS);
+
+ int rc = ml_dimension_update_models(dim);
+ if (rc) {
+ error("Failed to update models for %s [%u, %u]", rrddim_id(dim->rd), dim->kmeans.after, dim->kmeans.before);
+ }
+
+ worker_is_busy(WORKER_TRAIN_UPDATE_MODELS);
+
if (dim->km_contexts.size() < Cfg.num_models_to_use) {
dim->km_contexts.push_back(std::move(dim->kmeans));
} else {
- std::rotate(std::begin(dim->km_contexts), std::begin(dim->km_contexts) + 1, std::end(dim->km_contexts));
- dim->km_contexts[dim->km_contexts.size() - 1] = std::move(dim->kmeans);
+ bool can_drop_middle_km = false;
+
+ if (Cfg.num_models_to_use > 2) {
+ const ml_kmeans_t *old_km = &dim->km_contexts[dim->km_contexts.size() - 1];
+ const ml_kmeans_t *middle_km = &dim->km_contexts[dim->km_contexts.size() - 2];
+ const ml_kmeans_t *new_km = &dim->kmeans;
+
+ can_drop_middle_km = (middle_km->after < old_km->before) &&
+ (middle_km->before > new_km->after);
+ }
+
+ if (can_drop_middle_km) {
+ dim->km_contexts.back() = dim->kmeans;
+ } else {
+ std::rotate(std::begin(dim->km_contexts), std::begin(dim->km_contexts) + 1, std::end(dim->km_contexts));
+ dim->km_contexts[dim->km_contexts.size() - 1] = std::move(dim->kmeans);
+ }
}
dim->mt = METRIC_TYPE_CONSTANT;
@@ -494,11 +779,16 @@ ml_dimension_schedule_for_training(ml_dimension_t *dim, time_t curr_time)
}
if (schedule_for_training) {
- ml_host_t *host = (ml_host_t *) dim->rd->rrdset->rrdhost->ml_host;
ml_training_request_t req = {
- string_dup(dim->rd->rrdset->id), string_dup(dim->rd->id),
- curr_time, rrddim_first_entry_s(dim->rd), rrddim_last_entry_s(dim->rd),
+ string_dup(dim->rd->rrdset->rrdhost->hostname),
+ string_dup(dim->rd->rrdset->id),
+ string_dup(dim->rd->id),
+ curr_time,
+ rrddim_first_entry_s(dim->rd),
+ rrddim_last_entry_s(dim->rd),
};
+
+ ml_host_t *host = (ml_host_t *) dim->rd->rrdset->rrdhost->ml_host;
ml_queue_push(host->training_queue, req);
}
}
@@ -674,7 +964,6 @@ ml_host_detect_once(ml_host_t *host)
host->mls = {};
ml_machine_learning_stats_t mls_copy = {};
- ml_training_stats_t ts_copy = {};
{
netdata_mutex_lock(&host->mutex);
@@ -718,54 +1007,14 @@ ml_host_detect_once(ml_host_t *host)
mls_copy = host->mls;
- /*
- * training stats
- */
- ts_copy = host->ts;
-
- host->ts.queue_size = 0;
- host->ts.num_popped_items = 0;
-
- host->ts.allotted_ut = 0;
- host->ts.consumed_ut = 0;
- host->ts.remaining_ut = 0;
-
- host->ts.training_result_ok = 0;
- host->ts.training_result_invalid_query_time_range = 0;
- host->ts.training_result_not_enough_collected_values = 0;
- host->ts.training_result_null_acquired_dimension = 0;
- host->ts.training_result_chart_under_replication = 0;
-
netdata_mutex_unlock(&host->mutex);
}
- // Calc the avg values
- if (ts_copy.num_popped_items) {
- ts_copy.queue_size /= ts_copy.num_popped_items;
- ts_copy.allotted_ut /= ts_copy.num_popped_items;
- ts_copy.consumed_ut /= ts_copy.num_popped_items;
- ts_copy.remaining_ut /= ts_copy.num_popped_items;
-
- ts_copy.training_result_ok /= ts_copy.num_popped_items;
- ts_copy.training_result_invalid_query_time_range /= ts_copy.num_popped_items;
- ts_copy.training_result_not_enough_collected_values /= ts_copy.num_popped_items;
- ts_copy.training_result_null_acquired_dimension /= ts_copy.num_popped_items;
- ts_copy.training_result_chart_under_replication /= ts_copy.num_popped_items;
- } else {
- ts_copy.queue_size = 0;
- ts_copy.allotted_ut = 0;
- ts_copy.consumed_ut = 0;
- ts_copy.remaining_ut = 0;
- }
-
worker_is_busy(WORKER_JOB_DETECTION_DIM_CHART);
ml_update_dimensions_chart(host, mls_copy);
worker_is_busy(WORKER_JOB_DETECTION_HOST_CHART);
ml_update_host_and_detection_rate_charts(host, host->host_anomaly_rate * 10000.0);
-
- worker_is_busy(WORKER_JOB_DETECTION_STATS);
- ml_update_training_statistics_chart(host, ts_copy);
}
typedef struct {
@@ -774,18 +1023,21 @@ typedef struct {
} ml_acquired_dimension_t;
static ml_acquired_dimension_t
-ml_acquired_dimension_get(RRDHOST *rh, STRING *chart_id, STRING *dimension_id)
+ml_acquired_dimension_get(STRING *host_id, STRING *chart_id, STRING *dimension_id)
{
RRDDIM_ACQUIRED *acq_rd = NULL;
ml_dimension_t *dim = NULL;
- RRDSET *rs = rrdset_find(rh, string2str(chart_id));
- if (rs) {
- acq_rd = rrddim_find_and_acquire(rs, string2str(dimension_id));
- if (acq_rd) {
- RRDDIM *rd = rrddim_acquired_to_rrddim(acq_rd);
- if (rd)
- dim = (ml_dimension_t *) rd->ml_dimension;
+ RRDHOST *rh = rrdhost_find_by_hostname(string2str(host_id));
+ if (rh) {
+ RRDSET *rs = rrdset_find(rh, string2str(chart_id));
+ if (rs) {
+ acq_rd = rrddim_find_and_acquire(rs, string2str(dimension_id));
+ if (acq_rd) {
+ RRDDIM *rd = rrddim_acquired_to_rrddim(acq_rd);
+ if (rd)
+ dim = (ml_dimension_t *) rd->ml_dimension;
+ }
}
}
@@ -806,110 +1058,12 @@ ml_acquired_dimension_release(ml_acquired_dimension_t acq_dim)
}
static enum ml_training_result
-ml_acquired_dimension_train(ml_acquired_dimension_t acq_dim, const ml_training_request_t &TR)
+ml_acquired_dimension_train(ml_training_thread_t *training_thread, ml_acquired_dimension_t acq_dim, const ml_training_request_t &tr)
{
if (!acq_dim.dim)
return TRAINING_RESULT_NULL_ACQUIRED_DIMENSION;
- return ml_dimension_train_model(acq_dim.dim, TR);
-}
-
-#define WORKER_JOB_TRAINING_FIND 0
-#define WORKER_JOB_TRAINING_TRAIN 1
-#define WORKER_JOB_TRAINING_STATS 2
-
-static void
-ml_host_train(ml_host_t *host)
-{
- worker_register("MLTRAIN");
- worker_register_job_name(WORKER_JOB_TRAINING_FIND, "find");
- worker_register_job_name(WORKER_JOB_TRAINING_TRAIN, "train");
- worker_register_job_name(WORKER_JOB_TRAINING_STATS, "stats");
-
- service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, (force_quit_t ) ml_host_cancel_training_thread, host->rh, true);
-
- while (service_running(SERVICE_ML_TRAINING)) {
- ml_training_request_t training_req = ml_queue_pop(host->training_queue);
- size_t queue_size = ml_queue_size(host->training_queue) + 1;
-
- if (host->threads_cancelled) {
- info("Stopping training thread for host %s because it was cancelled", rrdhost_hostname(host->rh));
- break;
- }
-
- usec_t allotted_ut = (Cfg.train_every * host->rh->rrd_update_every * USEC_PER_SEC) / queue_size;
- if (allotted_ut > USEC_PER_SEC)
- allotted_ut = USEC_PER_SEC;
-
- usec_t start_ut = now_monotonic_usec();
- enum ml_training_result training_res;
- {
- worker_is_busy(WORKER_JOB_TRAINING_FIND);
- ml_acquired_dimension_t acq_dim = ml_acquired_dimension_get(host->rh, training_req.chart_id, training_req.dimension_id);
-
- worker_is_busy(WORKER_JOB_TRAINING_TRAIN);
- training_res = ml_acquired_dimension_train(acq_dim, training_req);
-
- string_freez(training_req.chart_id);
- string_freez(training_req.dimension_id);
-
- ml_acquired_dimension_release(acq_dim);
- }
- usec_t consumed_ut = now_monotonic_usec() - start_ut;
-
- worker_is_busy(WORKER_JOB_TRAINING_STATS);
-
- usec_t remaining_ut = 0;
- if (consumed_ut < allotted_ut)
- remaining_ut = allotted_ut - consumed_ut;
-
- {
- netdata_mutex_lock(&host->mutex);
-
- host->ts.queue_size += queue_size;
- host->ts.num_popped_items += 1;
-
- host->ts.allotted_ut += allotted_ut;
- host->ts.consumed_ut += consumed_ut;
- host->ts.remaining_ut += remaining_ut;
-
- switch (training_res) {
- case TRAINING_RESULT_OK:
- host->ts.training_result_ok += 1;
- break;
- case TRAINING_RESULT_INVALID_QUERY_TIME_RANGE:
- host->ts.training_result_invalid_query_time_range += 1;
- break;
- case TRAINING_RESULT_NOT_ENOUGH_COLLECTED_VALUES:
- host->ts.training_result_not_enough_collected_values += 1;
- break;
- case TRAINING_RESULT_NULL_ACQUIRED_DIMENSION:
- host->ts.training_result_null_acquired_dimension += 1;
- break;
- case TRAINING_RESULT_CHART_UNDER_REPLICATION:
- host->ts.training_result_chart_under_replication += 1;
- break;
- }
-
- netdata_mutex_unlock(&host->mutex);
- }
-
- worker_is_idle();
- std::this_thread::sleep_for(std::chrono::microseconds{remaining_ut});
- worker_is_busy(0);
- }
-}
-
-static void *
-train_main(void *arg)
-{
- size_t max_elements_needed_for_training = Cfg.max_train_samples * (Cfg.lag_n + 1);
- tls_data.training_cns = new calculated_number_t[max_elements_needed_for_training]();
- tls_data.scratch_training_cns = new calculated_number_t[max_elements_needed_for_training]();
-
- ml_host_t *host = (ml_host_t *) arg;
- ml_host_train(host);
- return NULL;
+ return ml_dimension_train_model(training_thread, acq_dim.dim, tr);
}
static void *
@@ -923,25 +1077,55 @@ ml_detect_main(void *arg)
worker_register_job_name(WORKER_JOB_DETECTION_HOST_CHART, "host chart");
worker_register_job_name(WORKER_JOB_DETECTION_STATS, "training stats");
- service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, NULL, NULL, true);
-
heartbeat_t hb;
heartbeat_init(&hb);
- while (service_running((SERVICE_TYPE)(SERVICE_ML_PREDICTION | SERVICE_COLLECTORS))) {
+ while (!Cfg.detection_stop) {
worker_is_idle();
heartbeat_next(&hb, USEC_PER_SEC);
- void *rhp;
- dfe_start_reentrant(rrdhost_root_index, rhp) {
- RRDHOST *rh = (RRDHOST *) rhp;
-
+ RRDHOST *rh;
+ rrd_rdlock();
+ rrdhost_foreach_read(rh) {
if (!rh->ml_host)
continue;
ml_host_detect_once((ml_host_t *) rh->ml_host);
}
- dfe_done(rhp);
+ rrd_unlock();
+
+ if (Cfg.enable_statistics_charts) {
+ // collect and update training thread stats
+ for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) {
+ ml_training_thread_t *training_thread = &Cfg.training_threads[idx];
+
+ netdata_mutex_lock(&training_thread->nd_mutex);
+ ml_training_stats_t training_stats = training_thread->training_stats;
+ training_thread->training_stats = {};
+ netdata_mutex_unlock(&training_thread->nd_mutex);
+
+ // calc the avg values
+ if (training_stats.num_popped_items) {
+ training_stats.queue_size /= training_stats.num_popped_items;
+ training_stats.allotted_ut /= training_stats.num_popped_items;
+ training_stats.consumed_ut /= training_stats.num_popped_items;
+ training_stats.remaining_ut /= training_stats.num_popped_items;
+ } else {
+ training_stats.queue_size = 0;
+ training_stats.allotted_ut = 0;
+ training_stats.consumed_ut = 0;
+ training_stats.remaining_ut = 0;
+
+ training_stats.training_result_ok = 0;
+ training_stats.training_result_invalid_query_time_range = 0;
+ training_stats.training_result_not_enough_collected_values = 0;
+ training_stats.training_result_null_acquired_dimension = 0;
+ training_stats.training_result_chart_under_replication = 0;
+ }
+
+ ml_update_training_statistics_chart(training_thread, training_stats);
+ }
+ }
}
return NULL;
@@ -975,31 +1159,6 @@ bool ml_streaming_enabled()
return Cfg.stream_anomaly_detection_charts;
}
-void ml_init()
-{
- // Read config values
- ml_config_load(&Cfg);
-
- if (!Cfg.enable_anomaly_detection)
- return;
-
- // Generate random numbers to efficiently sample the features we need
- // for KMeans clustering.
- std::random_device RD;
- std::mt19937 Gen(RD());
-
- Cfg.random_nums.reserve(Cfg.max_train_samples);
- for (size_t Idx = 0; Idx != Cfg.max_train_samples; Idx++)
- Cfg.random_nums.push_back(Gen());
-
-
- // start detection & training threads
- char tag[NETDATA_THREAD_TAG_MAX + 1];
-
- snprintfz(tag, NETDATA_THREAD_TAG_MAX, "%s", "PREDICT");
- netdata_thread_create(&Cfg.detection_thread, tag, NETDATA_THREAD_OPTION_JOINABLE, ml_detect_main, NULL);
-}
-
void ml_host_new(RRDHOST *rh)
{
if (!ml_enabled(rh))
@@ -1009,14 +1168,12 @@ void ml_host_new(RRDHOST *rh)
host->rh = rh;
host->mls = ml_machine_learning_stats_t();
- host->ts = ml_training_stats_t();
+ //host->ts = ml_training_stats_t();
- host->host_anomaly_rate = 0.0;
- host->threads_running = false;
- host->threads_cancelled = false;
- host->threads_joined = false;
+ static std::atomic<size_t> times_called(0);
+ host->training_queue = Cfg.training_threads[times_called++ % Cfg.num_training_threads].training_queue;
- host->training_queue = ml_queue_init();
+ host->host_anomaly_rate = 0.0;
netdata_mutex_init(&host->mutex);
@@ -1030,7 +1187,6 @@ void ml_host_delete(RRDHOST *rh)
return;
netdata_mutex_destroy(&host->mutex);
- ml_queue_destroy(host->training_queue);
delete host;
rh->ml_host = NULL;
@@ -1097,69 +1253,6 @@ void ml_host_get_models(RRDHOST *rh, BUFFER *wb)
error("Fetching KMeans models is not supported yet");
}
-void ml_host_start_training_thread(RRDHOST *rh)
-{
- if (!rh || !rh->ml_host)
- return;
-
- ml_host_t *host = (ml_host_t *) rh->ml_host;
-
- if (host->threads_running) {
- error("Anomaly detections threads for host %s are already-up and running.", rrdhost_hostname(host->rh));
- return;
- }
-
- host->threads_running = true;
- host->threads_cancelled = false;
- host->threads_joined = false;
-
- char tag[NETDATA_THREAD_TAG_MAX + 1];
-
- snprintfz(tag, NETDATA_THREAD_TAG_MAX, "MLTR[%s]", rrdhost_hostname(host->rh));
- netdata_thread_create(&host->training_thread, tag, NETDATA_THREAD_OPTION_JOINABLE, train_main, static_cast<void *>(host));
-}
-
-void ml_host_cancel_training_thread(RRDHOST *rh)
-{
- if (!rh || !rh->ml_host)
- return;
-
- ml_host_t *host = (ml_host_t *) rh->ml_host;
-
- if (!host->threads_running) {
- error("Anomaly detections threads for host %s have already been stopped.", rrdhost_hostname(host->rh));
- return;
- }
-
- if (!host->threads_cancelled) {
- host->threads_cancelled = true;
-
- // Signal the training queue to stop popping-items
- ml_queue_signal(host->training_queue);
- netdata_thread_cancel(host->training_thread);
- }
-}
-
-void ml_host_stop_training_thread(RRDHOST *rh)
-{
- if (!rh || !rh->ml_host)
- return;
-
- ml_host_cancel_training_thread(rh);
-
- ml_host_t *host = (ml_host_t *) rh->ml_host;
-
- if (!host->threads_joined) {
- host->threads_joined = true;
- host->threads_running = false;
-
- delete[] tls_data.training_cns;
- delete[] tls_data.scratch_training_cns;
-
- netdata_thread_join(host->training_thread, NULL);
- }
-}
-
void ml_chart_new(RRDSET *rs)
{
ml_host_t *host = (ml_host_t *) rs->rrdhost->ml_host;
@@ -1225,7 +1318,7 @@ void ml_dimension_new(RRDDIM *rd)
dim->last_training_time = 0;
- ml_kmeans_init(&dim->kmeans, 2, 1000);
+ ml_kmeans_init(&dim->kmeans);
if (simple_pattern_matches(Cfg.sp_charts_to_skip, rrdset_name(rd->rrdset)))
dim->mls = MACHINE_LEARNING_STATUS_DISABLED_DUE_TO_EXCLUDED_CHART;
@@ -1264,3 +1357,225 @@ bool ml_dimension_is_anomalous(RRDDIM *rd, time_t curr_time, double value, bool
return is_anomalous;
}
+
+static void *ml_train_main(void *arg) {
+ ml_training_thread_t *training_thread = (ml_training_thread_t *) arg;
+
+ char worker_name[1024];
+ snprintfz(worker_name, 1024, "training_thread_%zu", training_thread->id);
+ worker_register("MLTRAIN");
+
+ worker_register_job_name(WORKER_TRAIN_QUEUE_POP, "pop queue");
+ worker_register_job_name(WORKER_TRAIN_ACQUIRE_DIMENSION, "acquire");
+ worker_register_job_name(WORKER_TRAIN_QUERY, "query");
+ worker_register_job_name(WORKER_TRAIN_KMEANS, "kmeans");
+ worker_register_job_name(WORKER_TRAIN_UPDATE_MODELS, "update models");
+ worker_register_job_name(WORKER_TRAIN_LOAD_MODELS, "load models");
+ worker_register_job_name(WORKER_TRAIN_RELEASE_DIMENSION, "release");
+ worker_register_job_name(WORKER_TRAIN_UPDATE_HOST, "update host");
+
+ while (!Cfg.training_stop) {
+ worker_is_busy(WORKER_TRAIN_QUEUE_POP);
+
+ ml_training_request_t training_req = ml_queue_pop(training_thread->training_queue);
+
+ // we know this thread has been cancelled, when the queue starts
+ // returning "null" requests without blocking on queue's pop().
+ if (training_req.host_id == NULL)
+ break;