diff options
Diffstat (limited to 'ml/ml.cc')
-rw-r--r-- | ml/ml.cc | 892 |
1 files changed, 599 insertions, 293 deletions
@@ -7,15 +7,18 @@ #include <random> #include "ad_charts.h" +#include "database/sqlite/sqlite3.h" -typedef struct { - calculated_number_t *training_cns; - calculated_number_t *scratch_training_cns; - - std::vector<DSample> training_samples; -} ml_tls_data_t; +#define WORKER_TRAIN_QUEUE_POP 0 +#define WORKER_TRAIN_ACQUIRE_DIMENSION 1 +#define WORKER_TRAIN_QUERY 2 +#define WORKER_TRAIN_KMEANS 3 +#define WORKER_TRAIN_UPDATE_MODELS 4 +#define WORKER_TRAIN_RELEASE_DIMENSION 5 +#define WORKER_TRAIN_UPDATE_HOST 6 +#define WORKER_TRAIN_LOAD_MODELS 7 -static thread_local ml_tls_data_t tls_data; +static sqlite3 *db = NULL; /* * Functions to convert enums to strings @@ -173,26 +176,26 @@ ml_features_preprocess(ml_features_t *features) */ static void -ml_kmeans_init(ml_kmeans_t *kmeans, size_t num_clusters, size_t max_iterations) +ml_kmeans_init(ml_kmeans_t *kmeans) { - kmeans->num_clusters = num_clusters; - kmeans->max_iterations = max_iterations; - - kmeans->cluster_centers.reserve(kmeans->num_clusters); + kmeans->cluster_centers.reserve(2); kmeans->min_dist = std::numeric_limits<calculated_number_t>::max(); kmeans->max_dist = std::numeric_limits<calculated_number_t>::min(); } static void -ml_kmeans_train(ml_kmeans_t *kmeans, const ml_features_t *features) +ml_kmeans_train(ml_kmeans_t *kmeans, const ml_features_t *features, time_t after, time_t before) { + kmeans->after = (uint32_t) after; + kmeans->before = (uint32_t) before; + kmeans->min_dist = std::numeric_limits<calculated_number_t>::max(); kmeans->max_dist = std::numeric_limits<calculated_number_t>::min(); kmeans->cluster_centers.clear(); - dlib::pick_initial_centers(kmeans->num_clusters, kmeans->cluster_centers, features->preprocessed_features); - dlib::find_clusters_using_kmeans(features->preprocessed_features, kmeans->cluster_centers, kmeans->max_iterations); + dlib::pick_initial_centers(2, kmeans->cluster_centers, features->preprocessed_features); + dlib::find_clusters_using_kmeans(features->preprocessed_features, kmeans->cluster_centers, Cfg.max_kmeans_iters); for (const auto &preprocessed_feature : features->preprocessed_features) { calculated_number_t mean_dist = 0.0; @@ -201,7 +204,7 @@ ml_kmeans_train(ml_kmeans_t *kmeans, const ml_features_t *features) mean_dist += dlib::length(cluster_center - preprocessed_feature); } - mean_dist /= kmeans->num_clusters; + mean_dist /= kmeans->cluster_centers.size(); if (mean_dist < kmeans->min_dist) kmeans->min_dist = mean_dist; @@ -218,7 +221,7 @@ ml_kmeans_anomaly_score(const ml_kmeans_t *kmeans, const DSample &DS) for (const auto &CC: kmeans->cluster_centers) mean_dist += dlib::length(CC - DS); - mean_dist /= kmeans->num_clusters; + mean_dist /= kmeans->cluster_centers.size(); if (kmeans->max_dist == kmeans->min_dist) return 0.0; @@ -264,7 +267,14 @@ ml_queue_pop(ml_queue_t *q) { netdata_mutex_lock(&q->mutex); - ml_training_request_t req = { NULL, NULL, 0, 0, 0 }; + ml_training_request_t req = { + NULL, // host_id + NULL, // chart id + NULL, // dimension id + 0, // current time + 0, // first entry + 0 // last entry + }; while (q->internal.empty()) { pthread_cond_wait(&q->cond_var, &q->mutex); @@ -307,7 +317,7 @@ ml_queue_signal(ml_queue_t *q) */ static std::pair<calculated_number_t *, ml_training_response_t> -ml_dimension_calculated_numbers(ml_dimension_t *dim, const ml_training_request_t &training_request) +ml_dimension_calculated_numbers(ml_training_thread_t *training_thread, ml_dimension_t *dim, const ml_training_request_t &training_request) { ml_training_response_t training_response = {}; @@ -348,7 +358,7 @@ ml_dimension_calculated_numbers(ml_dimension_t *dim, const ml_training_request_t STORAGE_PRIORITY_BEST_EFFORT); size_t idx = 0; - memset(tls_data.training_cns, 0, sizeof(calculated_number_t) * max_n * (Cfg.lag_n + 1)); + memset(training_thread->training_cns, 0, sizeof(calculated_number_t) * max_n * (Cfg.lag_n + 1)); calculated_number_t last_value = std::numeric_limits<calculated_number_t>::quiet_NaN(); while (!storage_engine_query_is_finished(&handle)) { @@ -365,11 +375,11 @@ ml_dimension_calculated_numbers(ml_dimension_t *dim, const ml_training_request_t training_response.db_after_t = timestamp; training_response.db_before_t = timestamp; - tls_data.training_cns[idx] = value; - last_value = tls_data.training_cns[idx]; + training_thread->training_cns[idx] = value; + last_value = training_thread->training_cns[idx]; training_response.collected_values++; } else - tls_data.training_cns[idx] = last_value; + training_thread->training_cns[idx] = last_value; idx++; } @@ -384,20 +394,270 @@ ml_dimension_calculated_numbers(ml_dimension_t *dim, const ml_training_request_t } // Find first non-NaN value. - for (idx = 0; std::isnan(tls_data.training_cns[idx]); idx++, training_response.total_values--) { } + for (idx = 0; std::isnan(training_thread->training_cns[idx]); idx++, training_response.total_values--) { } // Overwrite NaN values. if (idx != 0) - memmove(tls_data.training_cns, &tls_data.training_cns[idx], sizeof(calculated_number_t) * training_response.total_values); + memmove(training_thread->training_cns, &training_thread->training_cns[idx], sizeof(calculated_number_t) * training_response.total_values); training_response.result = TRAINING_RESULT_OK; - return { tls_data.training_cns, training_response }; + return { training_thread->training_cns, training_response }; +} + +const char *db_models_create_table = + "CREATE TABLE IF NOT EXISTS models(" + " dim_id BLOB, dim_str TEXT, after INT, before INT," + " min_dist REAL, max_dist REAL," + " c00 REAL, c01 REAL, c02 REAL, c03 REAL, c04 REAL, c05 REAL," + " c10 REAL, c11 REAL, c12 REAL, c13 REAL, c14 REAL, c15 REAL," + " PRIMARY KEY(dim_id, after)" + ");"; + +const char *db_models_add_model = + "INSERT OR REPLACE INTO models(" + " dim_id, dim_str, after, before," + " min_dist, max_dist," + " c00, c01, c02, c03, c04, c05," + " c10, c11, c12, c13, c14, c15)" + "VALUES(" + " @dim_id, @dim_str, @after, @before," + " @min_dist, @max_dist," + " @c00, @c01, @c02, @c03, @c04, @c05," + " @c10, @c11, @c12, @c13, @c14, @c15);"; + +const char *db_models_load = + "SELECT * FROM models " + "WHERE dim_id == @dim_id AND after >= @after ORDER BY before ASC;"; + +const char *db_models_delete = + "DELETE FROM models " + "WHERE dim_id = @dim_id AND before < @before;"; + +static int +ml_dimension_add_model(ml_dimension_t *dim) +{ + static __thread sqlite3_stmt *res = NULL; + int param = 0; + int rc = 0; + + if (unlikely(!db)) { + error_report("Database has not been initialized"); + return 1; + } + + if (unlikely(!res)) { + rc = prepare_statement(db, db_models_add_model, &res); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to prepare statement to store model, rc = %d", rc); + return 1; + } + } + + rc = sqlite3_bind_blob(res, ++param, &dim->rd->metric_uuid, sizeof(dim->rd->metric_uuid), SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + char id[1024]; + snprintfz(id, 1024 - 1, "%s.%s", rrdset_id(dim->rd->rrdset), rrddim_id(dim->rd)); + rc = sqlite3_bind_text(res, ++param, id, -1, SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = sqlite3_bind_int(res, ++param, (int) dim->kmeans.after); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = sqlite3_bind_int(res, ++param, (int) dim->kmeans.before); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = sqlite3_bind_double(res, ++param, dim->kmeans.min_dist); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = sqlite3_bind_double(res, ++param, dim->kmeans.max_dist); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + if (dim->kmeans.cluster_centers.size() != 2) + fatal("Expected 2 cluster centers, got %zu", dim->kmeans.cluster_centers.size()); + + for (const DSample &ds : dim->kmeans.cluster_centers) { + if (ds.size() != 6) + fatal("Expected dsample with 6 dimensions, got %ld", ds.size()); + + for (long idx = 0; idx != ds.size(); idx++) { + calculated_number_t cn = ds(idx); + int rc = sqlite3_bind_double(res, ++param, cn); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + } + } + + rc = execute_insert(res); + if (unlikely(rc != SQLITE_DONE)) + error_report("Failed to store model, rc = %d", rc); + + rc = sqlite3_reset(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to reset statement when storing model, rc = %d", rc); + + return 0; + +bind_fail: + error_report("Failed to bind parameter %d to store model, rc = %d", param, rc); + rc = sqlite3_reset(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to reset statement to store model, rc = %d", rc); + return 1; +} + +static int +ml_dimension_delete_models(ml_dimension_t *dim) +{ + static __thread sqlite3_stmt *res = NULL; + int rc = 0; + int param = 0; + + if (unlikely(!db)) { + error_report("Database has not been initialized"); + return 1; + } + + if (unlikely(!res)) { + rc = prepare_statement(db, db_models_delete, &res); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to prepare statement to delete models, rc = %d", rc); + return 1; + } + } + + rc = sqlite3_bind_blob(res, ++param, &dim->rd->metric_uuid, sizeof(dim->rd->metric_uuid), SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = sqlite3_bind_int(res, ++param, (int) dim->kmeans.before - (Cfg.num_models_to_use * Cfg.train_every)); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = execute_insert(res); + if (unlikely(rc != SQLITE_DONE)) + error_report("Failed to delete models, rc = %d", rc); + + rc = sqlite3_reset(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to reset statement when deleting models, rc = %d", rc); + + return 0; + +bind_fail: + error_report("Failed to bind parameter %d to delete models, rc = %d", param, rc); + rc = sqlite3_reset(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to reset statement to delete models, rc = %d", rc); + return 1; +} + +static int +ml_dimension_load_models(ml_dimension_t *dim) { + std::vector<ml_kmeans_t> V; + + static __thread sqlite3_stmt *res = NULL; + int rc = 0; + int param = 0; + + if (unlikely(!db)) { + error_report("Database has not been initialized"); + return 1; + } + + if (unlikely(!res)) { + rc = prepare_statement(db, db_models_load, &res); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to prepare statement to load models, rc = %d", rc); + return 1; + } + } + + rc = sqlite3_bind_blob(res, ++param, &dim->rd->metric_uuid, sizeof(dim->rd->metric_uuid), SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = sqlite3_bind_int(res, ++param, now_realtime_usec() - (Cfg.num_models_to_use * Cfg.max_train_samples)); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + dim->km_contexts.reserve(Cfg.num_models_to_use); + while ((rc = sqlite3_step_monitored(res)) == SQLITE_ROW) { + ml_kmeans_t km; + + km.after = sqlite3_column_int(res, 2); + km.before = sqlite3_column_int(res, 3); + + km.min_dist = sqlite3_column_int(res, 4); + km.max_dist = sqlite3_column_int(res, 5); + + km.cluster_centers.resize(2); + + km.cluster_centers[0].set_size(Cfg.lag_n + 1); + km.cluster_centers[0](0) = sqlite3_column_double(res, 6); + km.cluster_centers[0](1) = sqlite3_column_double(res, 7); + km.cluster_centers[0](2) = sqlite3_column_double(res, 8); + km.cluster_centers[0](3) = sqlite3_column_double(res, 9); + km.cluster_centers[0](4) = sqlite3_column_double(res, 10); + km.cluster_centers[0](5) = sqlite3_column_double(res, 11); + + km.cluster_centers[1].set_size(Cfg.lag_n + 1); + km.cluster_centers[1](0) = sqlite3_column_double(res, 12); + km.cluster_centers[1](1) = sqlite3_column_double(res, 13); + km.cluster_centers[1](2) = sqlite3_column_double(res, 14); + km.cluster_centers[1](3) = sqlite3_column_double(res, 15); + km.cluster_centers[1](4) = sqlite3_column_double(res, 16); + km.cluster_centers[1](5) = sqlite3_column_double(res, 17); + + dim->km_contexts.push_back(km); + } + + if (unlikely(rc != SQLITE_DONE)) + error_report("Failed to load models, rc = %d", rc); + + rc = sqlite3_reset(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to reset statement when loading models, rc = %d", rc); + + return 0; + +bind_fail: + error_report("Failed to bind parameter %d to load models, rc = %d", param, rc); + rc = sqlite3_reset(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to reset statement to load models, rc = %d", rc); + return 1; +} + +static int +ml_dimension_update_models(ml_dimension_t *dim) +{ + int rc; + + if (dim->km_contexts.empty()) { + rc = ml_dimension_load_models(dim); + if (rc) + return rc; + } + + rc = ml_dimension_add_model(dim); + if (rc) + return rc; + + return ml_dimension_delete_models(dim); } static enum ml_training_result -ml_dimension_train_model(ml_dimension_t *dim, const ml_training_request_t &training_request) +ml_dimension_train_model(ml_training_thread_t *training_thread, ml_dimension_t *dim, const ml_training_request_t &training_request) { - auto P = ml_dimension_calculated_numbers(dim, training_request); + worker_is_busy(WORKER_TRAIN_QUERY); + auto P = ml_dimension_calculated_numbers(training_thread, dim, training_request); ml_training_response_t training_response = P.second; if (training_response.result != TRAINING_RESULT_OK) { @@ -426,31 +686,56 @@ ml_dimension_train_model(ml_dimension_t *dim, const ml_training_request_t &train } // compute kmeans + worker_is_busy(WORKER_TRAIN_KMEANS); { - memcpy(tls_data.scratch_training_cns, tls_data.training_cns, + memcpy(training_thread->scratch_training_cns, training_thread->training_cns, training_response.total_values * sizeof(calculated_number_t)); ml_features_t features = { Cfg.diff_n, Cfg.smooth_n, Cfg.lag_n, - tls_data.scratch_training_cns, training_response.total_values, - tls_data.training_cns, training_response.total_values, - tls_data.training_samples + training_thread->scratch_training_cns, training_response.total_values, + training_thread->training_cns, training_response.total_values, + training_thread->training_samples }; ml_features_preprocess(&features); - ml_kmeans_init(&dim->kmeans, 2, 1000); - ml_kmeans_train(&dim->kmeans, &features); + ml_kmeans_init(&dim->kmeans); + ml_kmeans_train(&dim->kmeans, &features, training_response.query_after_t, training_response.query_before_t); } - // update kmeans models + // update models { netdata_mutex_lock(&dim->mutex); + worker_is_busy(WORKER_TRAIN_LOAD_MODELS); + + int rc = ml_dimension_update_models(dim); + if (rc) { + error("Failed to update models for %s [%u, %u]", rrddim_id(dim->rd), dim->kmeans.after, dim->kmeans.before); + } + + worker_is_busy(WORKER_TRAIN_UPDATE_MODELS); + if (dim->km_contexts.size() < Cfg.num_models_to_use) { dim->km_contexts.push_back(std::move(dim->kmeans)); } else { - std::rotate(std::begin(dim->km_contexts), std::begin(dim->km_contexts) + 1, std::end(dim->km_contexts)); - dim->km_contexts[dim->km_contexts.size() - 1] = std::move(dim->kmeans); + bool can_drop_middle_km = false; + + if (Cfg.num_models_to_use > 2) { + const ml_kmeans_t *old_km = &dim->km_contexts[dim->km_contexts.size() - 1]; + const ml_kmeans_t *middle_km = &dim->km_contexts[dim->km_contexts.size() - 2]; + const ml_kmeans_t *new_km = &dim->kmeans; + + can_drop_middle_km = (middle_km->after < old_km->before) && + (middle_km->before > new_km->after); + } + + if (can_drop_middle_km) { + dim->km_contexts.back() = dim->kmeans; + } else { + std::rotate(std::begin(dim->km_contexts), std::begin(dim->km_contexts) + 1, std::end(dim->km_contexts)); + dim->km_contexts[dim->km_contexts.size() - 1] = std::move(dim->kmeans); + } } dim->mt = METRIC_TYPE_CONSTANT; @@ -494,11 +779,16 @@ ml_dimension_schedule_for_training(ml_dimension_t *dim, time_t curr_time) } if (schedule_for_training) { - ml_host_t *host = (ml_host_t *) dim->rd->rrdset->rrdhost->ml_host; ml_training_request_t req = { - string_dup(dim->rd->rrdset->id), string_dup(dim->rd->id), - curr_time, rrddim_first_entry_s(dim->rd), rrddim_last_entry_s(dim->rd), + string_dup(dim->rd->rrdset->rrdhost->hostname), + string_dup(dim->rd->rrdset->id), + string_dup(dim->rd->id), + curr_time, + rrddim_first_entry_s(dim->rd), + rrddim_last_entry_s(dim->rd), }; + + ml_host_t *host = (ml_host_t *) dim->rd->rrdset->rrdhost->ml_host; ml_queue_push(host->training_queue, req); } } @@ -674,7 +964,6 @@ ml_host_detect_once(ml_host_t *host) host->mls = {}; ml_machine_learning_stats_t mls_copy = {}; - ml_training_stats_t ts_copy = {}; { netdata_mutex_lock(&host->mutex); @@ -718,54 +1007,14 @@ ml_host_detect_once(ml_host_t *host) mls_copy = host->mls; - /* - * training stats - */ - ts_copy = host->ts; - - host->ts.queue_size = 0; - host->ts.num_popped_items = 0; - - host->ts.allotted_ut = 0; - host->ts.consumed_ut = 0; - host->ts.remaining_ut = 0; - - host->ts.training_result_ok = 0; - host->ts.training_result_invalid_query_time_range = 0; - host->ts.training_result_not_enough_collected_values = 0; - host->ts.training_result_null_acquired_dimension = 0; - host->ts.training_result_chart_under_replication = 0; - netdata_mutex_unlock(&host->mutex); } - // Calc the avg values - if (ts_copy.num_popped_items) { - ts_copy.queue_size /= ts_copy.num_popped_items; - ts_copy.allotted_ut /= ts_copy.num_popped_items; - ts_copy.consumed_ut /= ts_copy.num_popped_items; - ts_copy.remaining_ut /= ts_copy.num_popped_items; - - ts_copy.training_result_ok /= ts_copy.num_popped_items; - ts_copy.training_result_invalid_query_time_range /= ts_copy.num_popped_items; - ts_copy.training_result_not_enough_collected_values /= ts_copy.num_popped_items; - ts_copy.training_result_null_acquired_dimension /= ts_copy.num_popped_items; - ts_copy.training_result_chart_under_replication /= ts_copy.num_popped_items; - } else { - ts_copy.queue_size = 0; - ts_copy.allotted_ut = 0; - ts_copy.consumed_ut = 0; - ts_copy.remaining_ut = 0; - } - worker_is_busy(WORKER_JOB_DETECTION_DIM_CHART); ml_update_dimensions_chart(host, mls_copy); worker_is_busy(WORKER_JOB_DETECTION_HOST_CHART); ml_update_host_and_detection_rate_charts(host, host->host_anomaly_rate * 10000.0); - - worker_is_busy(WORKER_JOB_DETECTION_STATS); - ml_update_training_statistics_chart(host, ts_copy); } typedef struct { @@ -774,18 +1023,21 @@ typedef struct { } ml_acquired_dimension_t; static ml_acquired_dimension_t -ml_acquired_dimension_get(RRDHOST *rh, STRING *chart_id, STRING *dimension_id) +ml_acquired_dimension_get(STRING *host_id, STRING *chart_id, STRING *dimension_id) { RRDDIM_ACQUIRED *acq_rd = NULL; ml_dimension_t *dim = NULL; - RRDSET *rs = rrdset_find(rh, string2str(chart_id)); - if (rs) { - acq_rd = rrddim_find_and_acquire(rs, string2str(dimension_id)); - if (acq_rd) { - RRDDIM *rd = rrddim_acquired_to_rrddim(acq_rd); - if (rd) - dim = (ml_dimension_t *) rd->ml_dimension; + RRDHOST *rh = rrdhost_find_by_hostname(string2str(host_id)); + if (rh) { + RRDSET *rs = rrdset_find(rh, string2str(chart_id)); + if (rs) { + acq_rd = rrddim_find_and_acquire(rs, string2str(dimension_id)); + if (acq_rd) { + RRDDIM *rd = rrddim_acquired_to_rrddim(acq_rd); + if (rd) + dim = (ml_dimension_t *) rd->ml_dimension; + } } } @@ -806,110 +1058,12 @@ ml_acquired_dimension_release(ml_acquired_dimension_t acq_dim) } static enum ml_training_result -ml_acquired_dimension_train(ml_acquired_dimension_t acq_dim, const ml_training_request_t &TR) +ml_acquired_dimension_train(ml_training_thread_t *training_thread, ml_acquired_dimension_t acq_dim, const ml_training_request_t &tr) { if (!acq_dim.dim) return TRAINING_RESULT_NULL_ACQUIRED_DIMENSION; - return ml_dimension_train_model(acq_dim.dim, TR); -} - -#define WORKER_JOB_TRAINING_FIND 0 -#define WORKER_JOB_TRAINING_TRAIN 1 -#define WORKER_JOB_TRAINING_STATS 2 - -static void -ml_host_train(ml_host_t *host) -{ - worker_register("MLTRAIN"); - worker_register_job_name(WORKER_JOB_TRAINING_FIND, "find"); - worker_register_job_name(WORKER_JOB_TRAINING_TRAIN, "train"); - worker_register_job_name(WORKER_JOB_TRAINING_STATS, "stats"); - - service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, (force_quit_t ) ml_host_cancel_training_thread, host->rh, true); - - while (service_running(SERVICE_ML_TRAINING)) { - ml_training_request_t training_req = ml_queue_pop(host->training_queue); - size_t queue_size = ml_queue_size(host->training_queue) + 1; - - if (host->threads_cancelled) { - info("Stopping training thread for host %s because it was cancelled", rrdhost_hostname(host->rh)); - break; - } - - usec_t allotted_ut = (Cfg.train_every * host->rh->rrd_update_every * USEC_PER_SEC) / queue_size; - if (allotted_ut > USEC_PER_SEC) - allotted_ut = USEC_PER_SEC; - - usec_t start_ut = now_monotonic_usec(); - enum ml_training_result training_res; - { - worker_is_busy(WORKER_JOB_TRAINING_FIND); - ml_acquired_dimension_t acq_dim = ml_acquired_dimension_get(host->rh, training_req.chart_id, training_req.dimension_id); - - worker_is_busy(WORKER_JOB_TRAINING_TRAIN); - training_res = ml_acquired_dimension_train(acq_dim, training_req); - - string_freez(training_req.chart_id); - string_freez(training_req.dimension_id); - - ml_acquired_dimension_release(acq_dim); - } - usec_t consumed_ut = now_monotonic_usec() - start_ut; - - worker_is_busy(WORKER_JOB_TRAINING_STATS); - - usec_t remaining_ut = 0; - if (consumed_ut < allotted_ut) - remaining_ut = allotted_ut - consumed_ut; - - { - netdata_mutex_lock(&host->mutex); - - host->ts.queue_size += queue_size; - host->ts.num_popped_items += 1; - - host->ts.allotted_ut += allotted_ut; - host->ts.consumed_ut += consumed_ut; - host->ts.remaining_ut += remaining_ut; - - switch (training_res) { - case TRAINING_RESULT_OK: - host->ts.training_result_ok += 1; - break; - case TRAINING_RESULT_INVALID_QUERY_TIME_RANGE: - host->ts.training_result_invalid_query_time_range += 1; - break; - case TRAINING_RESULT_NOT_ENOUGH_COLLECTED_VALUES: - host->ts.training_result_not_enough_collected_values += 1; - break; - case TRAINING_RESULT_NULL_ACQUIRED_DIMENSION: - host->ts.training_result_null_acquired_dimension += 1; - break; - case TRAINING_RESULT_CHART_UNDER_REPLICATION: - host->ts.training_result_chart_under_replication += 1; - break; - } - - netdata_mutex_unlock(&host->mutex); - } - - worker_is_idle(); - std::this_thread::sleep_for(std::chrono::microseconds{remaining_ut}); - worker_is_busy(0); - } -} - -static void * -train_main(void *arg) -{ - size_t max_elements_needed_for_training = Cfg.max_train_samples * (Cfg.lag_n + 1); - tls_data.training_cns = new calculated_number_t[max_elements_needed_for_training](); - tls_data.scratch_training_cns = new calculated_number_t[max_elements_needed_for_training](); - - ml_host_t *host = (ml_host_t *) arg; - ml_host_train(host); - return NULL; + return ml_dimension_train_model(training_thread, acq_dim.dim, tr); } static void * @@ -923,25 +1077,55 @@ ml_detect_main(void *arg) worker_register_job_name(WORKER_JOB_DETECTION_HOST_CHART, "host chart"); worker_register_job_name(WORKER_JOB_DETECTION_STATS, "training stats"); - service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, NULL, NULL, true); - heartbeat_t hb; heartbeat_init(&hb); - while (service_running((SERVICE_TYPE)(SERVICE_ML_PREDICTION | SERVICE_COLLECTORS))) { + while (!Cfg.detection_stop) { worker_is_idle(); heartbeat_next(&hb, USEC_PER_SEC); - void *rhp; - dfe_start_reentrant(rrdhost_root_index, rhp) { - RRDHOST *rh = (RRDHOST *) rhp; - + RRDHOST *rh; + rrd_rdlock(); + rrdhost_foreach_read(rh) { if (!rh->ml_host) continue; ml_host_detect_once((ml_host_t *) rh->ml_host); } - dfe_done(rhp); + rrd_unlock(); + + if (Cfg.enable_statistics_charts) { + // collect and update training thread stats + for (size_t idx = 0; idx != Cfg.num_training_threads; idx++) { + ml_training_thread_t *training_thread = &Cfg.training_threads[idx]; + + netdata_mutex_lock(&training_thread->nd_mutex); + ml_training_stats_t training_stats = training_thread->training_stats; + training_thread->training_stats = {}; + netdata_mutex_unlock(&training_thread->nd_mutex); + + // calc the avg values + if (training_stats.num_popped_items) { + training_stats.queue_size /= training_stats.num_popped_items; + training_stats.allotted_ut /= training_stats.num_popped_items; + training_stats.consumed_ut /= training_stats.num_popped_items; + training_stats.remaining_ut /= training_stats.num_popped_items; + } else { + training_stats.queue_size = 0; + training_stats.allotted_ut = 0; + training_stats.consumed_ut = 0; + training_stats.remaining_ut = 0; + + training_stats.training_result_ok = 0; + training_stats.training_result_invalid_query_time_range = 0; + training_stats.training_result_not_enough_collected_values = 0; + training_stats.training_result_null_acquired_dimension = 0; + training_stats.training_result_chart_under_replication = 0; + } + + ml_update_training_statistics_chart(training_thread, training_stats); + } + } } return NULL; @@ -975,31 +1159,6 @@ bool ml_streaming_enabled() return Cfg.stream_anomaly_detection_charts; } -void ml_init() -{ - // Read config values - ml_config_load(&Cfg); - - if (!Cfg.enable_anomaly_detection) - return; - - // Generate random numbers to efficiently sample the features we need - // for KMeans clustering. - std::random_device RD; - std::mt19937 Gen(RD()); - - Cfg.random_nums.reserve(Cfg.max_train_samples); - for (size_t Idx = 0; Idx != Cfg.max_train_samples; Idx++) - Cfg.random_nums.push_back(Gen()); - - - // start detection & training threads - char tag[NETDATA_THREAD_TAG_MAX + 1]; - - snprintfz(tag, NETDATA_THREAD_TAG_MAX, "%s", "PREDICT"); - netdata_thread_create(&Cfg.detection_thread, tag, NETDATA_THREAD_OPTION_JOINABLE, ml_detect_main, NULL); -} - void ml_host_new(RRDHOST *rh) { if (!ml_enabled(rh)) @@ -1009,14 +1168,12 @@ void ml_host_new(RRDHOST *rh) host->rh = rh; host->mls = ml_machine_learning_stats_t(); - host->ts = ml_training_stats_t(); + //host->ts = ml_training_stats_t(); - host->host_anomaly_rate = 0.0; - host->threads_running = false; - host->threads_cancelled = false; - host->threads_joined = false; + static std::atomic<size_t> times_called(0); + host->training_queue = Cfg.training_threads[times_called++ % Cfg.num_training_threads].training_queue; - host->training_queue = ml_queue_init(); + host->host_anomaly_rate = 0.0; netdata_mutex_init(&host->mutex); @@ -1030,7 +1187,6 @@ void ml_host_delete(RRDHOST *rh) return; netdata_mutex_destroy(&host->mutex); - ml_queue_destroy(host->training_queue); delete host; rh->ml_host = NULL; @@ -1097,69 +1253,6 @@ void ml_host_get_models(RRDHOST *rh, BUFFER *wb) error("Fetching KMeans models is not supported yet"); } -void ml_host_start_training_thread(RRDHOST *rh) -{ - if (!rh || !rh->ml_host) - return; - - ml_host_t *host = (ml_host_t *) rh->ml_host; - - if (host->threads_running) { - error("Anomaly detections threads for host %s are already-up and running.", rrdhost_hostname(host->rh)); - return; - } - - host->threads_running = true; - host->threads_cancelled = false; - host->threads_joined = false; - - char tag[NETDATA_THREAD_TAG_MAX + 1]; - - snprintfz(tag, NETDATA_THREAD_TAG_MAX, "MLTR[%s]", rrdhost_hostname(host->rh)); - netdata_thread_create(&host->training_thread, tag, NETDATA_THREAD_OPTION_JOINABLE, train_main, static_cast<void *>(host)); -} - -void ml_host_cancel_training_thread(RRDHOST *rh) -{ - if (!rh || !rh->ml_host) - return; - - ml_host_t *host = (ml_host_t *) rh->ml_host; - - if (!host->threads_running) { - error("Anomaly detections threads for host %s have already been stopped.", rrdhost_hostname(host->rh)); - return; - } - - if (!host->threads_cancelled) { - host->threads_cancelled = true; - - // Signal the training queue to stop popping-items - ml_queue_signal(host->training_queue); - netdata_thread_cancel(host->training_thread); - } -} - -void ml_host_stop_training_thread(RRDHOST *rh) -{ - if (!rh || !rh->ml_host) - return; - - ml_host_cancel_training_thread(rh); - - ml_host_t *host = (ml_host_t *) rh->ml_host; - - if (!host->threads_joined) { - host->threads_joined = true; - host->threads_running = false; - - delete[] tls_data.training_cns; - delete[] tls_data.scratch_training_cns; - - netdata_thread_join(host->training_thread, NULL); - } -} - void ml_chart_new(RRDSET *rs) { ml_host_t *host = (ml_host_t *) rs->rrdhost->ml_host; @@ -1225,7 +1318,7 @@ void ml_dimension_new(RRDDIM *rd) dim->last_training_time = 0; - ml_kmeans_init(&dim->kmeans, 2, 1000); + ml_kmeans_init(&dim->kmeans); if (simple_pattern_matches(Cfg.sp_charts_to_skip, rrdset_name(rd->rrdset))) dim->mls = MACHINE_LEARNING_STATUS_DISABLED_DUE_TO_EXCLUDED_CHART; @@ -1264,3 +1357,216 @@ bool ml_dimension_is_anomalous(RRDDIM *rd, time_t curr_time, double value, bool return is_anomalous; } + +static void *ml_train_main(void *arg) { + ml_training_thread_t *training_thread = (ml_training_thread_t *) arg; + + char worker_name[1024]; + snprintfz(worker_name, 1024, "training_thread_%zu", training_thread->id); + worker_register("MLTRAIN"); + + worker_register_job_name(WORKER_TRAIN_QUEUE_POP, "pop queue"); + worker_register_job_name(WORKER_TRAIN_ACQUIRE_DIMENSION, "acquire"); + worker_register_job_name(WORKER_TRAIN_QUERY, "query"); + worker_register_job_name(WORKER_TRAIN_KMEANS, "kmeans"); + worker_register_job_name(WORKER_TRAIN_UPDATE_MODELS, "update models"); + worker_register_job_name(WORKER_TRAIN_LOAD_MODELS, "load models"); + worker_register_job_name(WORKER_TRAIN_RELEASE_DIMENSION, "release"); + worker_register_job_name(WORKER_TRAIN_UPDATE_HOST, "update host"); + + while (!Cfg.training_stop) { + worker_is_busy(WORKER_TRAIN_QUEUE_POP); + + ml_training_request_t training_req = ml_queue_pop(training_thread->training_queue); + + // we know this thread has been cancelled, when the queue starts + // returning "null" requests without blocking on queue's pop(). + if (training_req.host_id == NULL) + break; + + size_t queue_size = ml_queue_size(training_thread->training_queue) + 1; + + usec_t allotted_ut = (Cfg.train_every * USEC_PER_SEC) / queue_size; + if (allotted_ut > USEC_PER_SEC) + allotted_ut = USEC_PER_SEC; + + usec_t start_ut = now_monotonic_usec(); + + enum ml_training_result training_res; + { + worker_is_busy(WORKER_TRAIN_ACQUIRE_DIMENSION); + ml_acquired_dimension_t acq_dim = ml_acquired_dimension_get( + training_req.host_id, + training_req.chart_id, + training_req.dimension_id); + + training_res = ml_acquired_dimension_train(training_thread, acq_dim, training_req); + + string_freez(training_req.host_id); + string_freez(training_req.chart_id); + string_freez(training_req.dimension_id); + + worker_is_busy(WORKER_TRAIN_RELEASE_DIMENSION); + ml_acquired_dimension_release(acq_dim); + } + + usec_t consumed_ut = now_monotonic_usec() - start_ut; + + usec_t remaining_ut = 0; + if (consumed_ut < allotted_ut) + remaining_ut = allotted_ut - consumed_ut; + + if (Cfg.enable_statistics_charts) { + worker_is_busy(WORKER_TRAIN_UPDATE_HOST); + + netdata_mutex_lock(&training_thread->nd_mutex); + |