diff options
Diffstat (limited to 'ml')
-rw-r--r-- | ml/Config.cc | 12 | ||||
-rw-r--r-- | ml/ad_charts.cc | 167 | ||||
-rw-r--r-- | ml/ad_charts.h | 2 | ||||
-rw-r--r-- | ml/ml-dummy.c | 10 | ||||
-rw-r--r-- | ml/ml-private.h | 42 | ||||
-rw-r--r-- | ml/ml.cc | 901 | ||||
-rw-r--r-- | ml/ml.h | 11 |
7 files changed, 764 insertions, 381 deletions
diff --git a/ml/Config.cc b/ml/Config.cc index 8b04590d77..415d11b838 100644 --- a/ml/Config.cc +++ b/ml/Config.cc @@ -34,7 +34,7 @@ void ml_config_load(ml_config_t *cfg) { unsigned smooth_n = config_get_number(config_section_ml, "num samples to smooth", 3); unsigned lag_n = config_get_number(config_section_ml, "num samples to lag", 5); - double random_sampling_ratio = config_get_float(config_section_ml, "random sampling ratio", 1.0 / lag_n); + double random_sampling_ratio = config_get_float(config_section_ml, "random sampling ratio", 1.0 / 5.0 /* default lag_n */); unsigned max_kmeans_iters = config_get_number(config_section_ml, "maximum number of k-means iterations", 1000); double dimension_anomaly_rate_threshold = config_get_float(config_section_ml, "dimension anomaly score threshold", 0.99); @@ -43,6 +43,10 @@ void ml_config_load(ml_config_t *cfg) { std::string anomaly_detection_grouping_method = config_get(config_section_ml, "anomaly detection grouping method", "average"); time_t anomaly_detection_query_duration = config_get_number(config_section_ml, "anomaly detection grouping duration", 5 * 60); + size_t num_training_threads = config_get_number(config_section_ml, "num training threads", 4); + + bool enable_statistics_charts = config_get_boolean(config_section_ml, "enable statistics charts", false); + /* * Clamp */ @@ -64,6 +68,8 @@ void ml_config_load(ml_config_t *cfg) { host_anomaly_rate_threshold = clamp(host_anomaly_rate_threshold, 0.1, 10.0); anomaly_detection_query_duration = clamp<time_t>(anomaly_detection_query_duration, 60, 15 * 60); + num_training_threads = clamp<size_t>(num_training_threads, 1, 128); + /* * Validate */ @@ -109,4 +115,8 @@ void ml_config_load(ml_config_t *cfg) { cfg->sp_charts_to_skip = simple_pattern_create(cfg->charts_to_skip.c_str(), NULL, SIMPLE_PATTERN_EXACT, true); cfg->stream_anomaly_detection_charts = config_get_boolean(config_section_ml, "stream anomaly detection charts", true); + + cfg->num_training_threads = num_training_threads; + + cfg->enable_statistics_charts = enable_statistics_charts; } diff --git a/ml/ad_charts.cc b/ml/ad_charts.cc index a32ff6c650..086cd5aa02 100644 --- a/ml/ad_charts.cc +++ b/ml/ad_charts.cc @@ -6,7 +6,7 @@ void ml_update_dimensions_chart(ml_host_t *host, const ml_machine_learning_stats /* * Machine learning status */ - { + if (Cfg.enable_statistics_charts) { if (!host->machine_learning_status_rs) { char id_buf[1024]; char name_buf[1024]; @@ -48,7 +48,7 @@ void ml_update_dimensions_chart(ml_host_t *host, const ml_machine_learning_stats /* * Metric type */ - { + if (Cfg.enable_statistics_charts) { if (!host->metric_type_rs) { char id_buf[1024]; char name_buf[1024]; @@ -90,7 +90,7 @@ void ml_update_dimensions_chart(ml_host_t *host, const ml_machine_learning_stats /* * Training status */ - { + if (Cfg.enable_statistics_charts) { if (!host->training_status_rs) { char id_buf[1024]; char name_buf[1024]; @@ -179,7 +179,6 @@ void ml_update_dimensions_chart(ml_host_t *host, const ml_machine_learning_stats rrdset_done(host->dimensions_rs); } - } void ml_update_host_and_detection_rate_charts(ml_host_t *host, collected_number AnomalyRate) { @@ -301,20 +300,20 @@ void ml_update_host_and_detection_rate_charts(ml_host_t *host, collected_number } } -void ml_update_training_statistics_chart(ml_host_t *host, const ml_training_stats_t &ts) { +void ml_update_training_statistics_chart(ml_training_thread_t *training_thread, const ml_training_stats_t &ts) { /* * queue stats */ { - if (!host->queue_stats_rs) { + if (!training_thread->queue_stats_rs) { char id_buf[1024]; char name_buf[1024]; - snprintfz(id_buf, 1024, "queue_stats_on_%s", localhost->machine_guid); - snprintfz(name_buf, 1024, "queue_stats_on_%s", rrdhost_hostname(localhost)); + snprintfz(id_buf, 1024, "training_queue_%zu_stats", training_thread->id); + snprintfz(name_buf, 1024, "training_queue_%zu_stats", training_thread->id); - host->queue_stats_rs = rrdset_create( - host->rh, + training_thread->queue_stats_rs = rrdset_create( + localhost, "netdata", // type id_buf, // id name_buf, // name @@ -328,35 +327,35 @@ void ml_update_training_statistics_chart(ml_host_t *host, const ml_training_stat localhost->rrd_update_every, // update_every RRDSET_TYPE_LINE// chart_type ); - rrdset_flag_set(host->queue_stats_rs, RRDSET_FLAG_ANOMALY_DETECTION); + rrdset_flag_set(training_thread->queue_stats_rs, RRDSET_FLAG_ANOMALY_DETECTION); - host->queue_stats_queue_size_rd = - rrddim_add(host->queue_stats_rs, "queue_size", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - host->queue_stats_popped_items_rd = - rrddim_add(host->queue_stats_rs, "popped_items", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + training_thread->queue_stats_queue_size_rd = + rrddim_add(training_thread->queue_stats_rs, "queue_size", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + training_thread->queue_stats_popped_items_rd = + rrddim_add(training_thread->queue_stats_rs, "popped_items", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); } - rrddim_set_by_pointer(host->queue_stats_rs, - host->queue_stats_queue_size_rd, ts.queue_size); - rrddim_set_by_pointer(host->queue_stats_rs, - host->queue_stats_popped_items_rd, ts.num_popped_items); + rrddim_set_by_pointer(training_thread->queue_stats_rs, + training_thread->queue_stats_queue_size_rd, ts.queue_size); + rrddim_set_by_pointer(training_thread->queue_stats_rs, + training_thread->queue_stats_popped_items_rd, ts.num_popped_items); - rrdset_done(host->queue_stats_rs); + rrdset_done(training_thread->queue_stats_rs); } /* * training stats */ { - if (!host->training_time_stats_rs) { + if (!training_thread->training_time_stats_rs) { char id_buf[1024]; char name_buf[1024]; - snprintfz(id_buf, 1024, "training_time_stats_on_%s", localhost->machine_guid); - snprintfz(name_buf, 1024, "training_time_stats_on_%s", rrdhost_hostname(localhost)); + snprintfz(id_buf, 1024, "training_queue_%zu_time_stats", training_thread->id); + snprintfz(name_buf, 1024, "training_queue_%zu_time_stats", training_thread->id); - host->training_time_stats_rs = rrdset_create( - host->rh, + training_thread->training_time_stats_rs = rrdset_create( + localhost, "netdata", // type id_buf, // id name_buf, // name @@ -370,39 +369,39 @@ void ml_update_training_statistics_chart(ml_host_t *host, const ml_training_stat localhost->rrd_update_every, // update_every RRDSET_TYPE_LINE// chart_type ); - rrdset_flag_set(host->training_time_stats_rs, RRDSET_FLAG_ANOMALY_DETECTION); - - host->training_time_stats_allotted_rd = - rrddim_add(host->training_time_stats_rs, "allotted", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE); - host->training_time_stats_consumed_rd = - rrddim_add(host->training_time_stats_rs, "consumed", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE); - host->training_time_stats_remaining_rd = - rrddim_add(host->training_time_stats_rs, "remaining", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE); + rrdset_flag_set(training_thread->training_time_stats_rs, RRDSET_FLAG_ANOMALY_DETECTION); + + training_thread->training_time_stats_allotted_rd = + rrddim_add(training_thread->training_time_stats_rs, "allotted", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE); + training_thread->training_time_stats_consumed_rd = + rrddim_add(training_thread->training_time_stats_rs, "consumed", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE); + training_thread->training_time_stats_remaining_rd = + rrddim_add(training_thread->training_time_stats_rs, "remaining", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE); } - rrddim_set_by_pointer(host->training_time_stats_rs, - host->training_time_stats_allotted_rd, ts.allotted_ut); - rrddim_set_by_pointer(host->training_time_stats_rs, - host->training_time_stats_consumed_rd, ts.consumed_ut); - rrddim_set_by_pointer(host->training_time_stats_rs, - host->training_time_stats_remaining_rd, ts.remaining_ut); + rrddim_set_by_pointer(training_thread->training_time_stats_rs, + training_thread->training_time_stats_allotted_rd, ts.allotted_ut); + rrddim_set_by_pointer(training_thread->training_time_stats_rs, + training_thread->training_time_stats_consumed_rd, ts.consumed_ut); + rrddim_set_by_pointer(training_thread->training_time_stats_rs, + training_thread->training_time_stats_remaining_rd, ts.remaining_ut); - rrdset_done(host->training_time_stats_rs); + rrdset_done(training_thread->training_time_stats_rs); } /* * training result stats */ { - if (!host->training_results_rs) { + if (!training_thread->training_results_rs) { char id_buf[1024]; char name_buf[1024]; - snprintfz(id_buf, 1024, "training_results_on_%s", localhost->machine_guid); - snprintfz(name_buf, 1024, "training_results_on_%s", rrdhost_hostname(localhost)); + snprintfz(id_buf, 1024, "training_queue_%zu_results", training_thread->id); + snprintfz(name_buf, 1024, "training_queue_%zu_results", training_thread->id); - host->training_results_rs = rrdset_create( - host->rh, + training_thread->training_results_rs = rrdset_create( + localhost, "netdata", // type id_buf, // id name_buf, // name @@ -416,31 +415,61 @@ void ml_update_training_statistics_chart(ml_host_t *host, const ml_training_stat localhost->rrd_update_every, // update_every RRDSET_TYPE_LINE// chart_type ); - rrdset_flag_set(host->training_results_rs, RRDSET_FLAG_ANOMALY_DETECTION); - - host->training_results_ok_rd = - rrddim_add(host->training_results_rs, "ok", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - host->training_results_invalid_query_time_range_rd = - rrddim_add(host->training_results_rs, "invalid-queries", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - host->training_results_not_enough_collected_values_rd = - rrddim_add(host->training_results_rs, "not-enough-values", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - host->training_results_null_acquired_dimension_rd = - rrddim_add(host->training_results_rs, "null-acquired-dimensions", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - host->training_results_chart_under_replication_rd = - rrddim_add(host->training_results_rs, "chart-under-replication", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + rrdset_flag_set(training_thread->training_results_rs, RRDSET_FLAG_ANOMALY_DETECTION); + + training_thread->training_results_ok_rd = + rrddim_add(training_thread->training_results_rs, "ok", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + training_thread->training_results_invalid_query_time_range_rd = + rrddim_add(training_thread->training_results_rs, "invalid-queries", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + training_thread->training_results_not_enough_collected_values_rd = + rrddim_add(training_thread->training_results_rs, "not-enough-values", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + training_thread->training_results_null_acquired_dimension_rd = + rrddim_add(training_thread->training_results_rs, "null-acquired-dimensions", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + training_thread->training_results_chart_under_replication_rd = + rrddim_add(training_thread->training_results_rs, "chart-under-replication", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); } - rrddim_set_by_pointer(host->training_results_rs, - host->training_results_ok_rd, ts.training_result_ok); - rrddim_set_by_pointer(host->training_results_rs, - host->training_results_invalid_query_time_range_rd, ts.training_result_invalid_query_time_range); - rrddim_set_by_pointer(host->training_results_rs, - host->training_results_not_enough_collected_values_rd, ts.training_result_not_enough_collected_values); - rrddim_set_by_pointer(host->training_results_rs, - host->training_results_null_acquired_dimension_rd, ts.training_result_null_acquired_dimension); - rrddim_set_by_pointer(host->training_results_rs, - host->training_results_chart_under_replication_rd, ts.training_result_chart_under_replication); - - rrdset_done(host->training_results_rs); + rrddim_set_by_pointer(training_thread->training_results_rs, + training_thread->training_results_ok_rd, ts.training_result_ok); + rrddim_set_by_pointer(training_thread->training_results_rs, + training_thread->training_results_invalid_query_time_range_rd, ts.training_result_invalid_query_time_range); + rrddim_set_by_pointer(training_thread->training_results_rs, + training_thread->training_results_not_enough_collected_values_rd, ts.training_result_not_enough_collected_values); + rrddim_set_by_pointer(training_thread->training_results_rs, + training_thread->training_results_null_acquired_dimension_rd, ts.training_result_null_acquired_dimension); + rrddim_set_by_pointer(training_thread->training_results_rs, + training_thread->training_results_chart_under_replication_rd, ts.training_result_chart_under_replication); + + rrdset_done(training_thread->training_results_rs); + } +} + +void ml_update_global_statistics_charts(uint64_t models_consulted) { + if (Cfg.enable_statistics_charts) { + static RRDSET *st = NULL; + static RRDDIM *rd = NULL; + + if (unlikely(!st)) { + st = rrdset_create_localhost( + "netdata" // type + , "ml_models_consulted" // id + , NULL // name + , NETDATA_ML_CHART_FAMILY // family + , NULL // context + , "KMeans models used for prediction" // title + , "models" // units + , NETDATA_ML_PLUGIN // plugin + , NETDATA_ML_MODULE_DETECTION // module + , NETDATA_ML_CHART_PRIO_MACHINE_LEARNING_STATUS // priority + , localhost->rrd_update_every // update_every + , RRDSET_TYPE_AREA // chart_type + ); + + rd = rrddim_add(st, "num_models_consulted", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL); + } + + rrddim_set_by_pointer(st, rd, (collected_number) models_consulted); + + rrdset_done(st); } } diff --git a/ml/ad_charts.h b/ml/ad_charts.h index a973b44a51..349b369a24 100644 --- a/ml/ad_charts.h +++ b/ml/ad_charts.h @@ -9,6 +9,6 @@ void ml_update_dimensions_chart(ml_host_t *host, const ml_machine_learning_stats void ml_update_host_and_detection_rate_charts(ml_host_t *host, collected_number anomaly_rate); -void ml_update_training_statistics_chart(ml_host_t *host, const ml_training_stats_t &ts); +void ml_update_training_statistics_chart(ml_training_thread_t *training_thread, const ml_training_stats_t &ts); #endif /* ML_ADCHARTS_H */ diff --git a/ml/ml-dummy.c b/ml/ml-dummy.c index 53444e246f..6ea0818c68 100644 --- a/ml/ml-dummy.c +++ b/ml/ml-dummy.c @@ -19,6 +19,12 @@ bool ml_streaming_enabled() { void ml_init(void) {} +void ml_fini(void) {} + +void ml_start_threads(void) {} + +void ml_stop_threads(void) {} + void ml_host_new(RRDHOST *rh) { UNUSED(rh); } @@ -86,4 +92,8 @@ bool ml_dimension_is_anomalous(RRDDIM *rd, time_t curr_time, double value, bool return false; } +void ml_update_global_statistics_charts(uint64_t models_consulted) { + UNUSED(models_consulted); +} + #endif diff --git a/ml/ml-private.h b/ml/ml-private.h index 173b82e265..d014c71d26 100644 --- a/ml/ml-private.h +++ b/ml/ml-private.h @@ -33,14 +33,15 @@ typedef struct { /* * KMeans */ -typedef struct { - size_t num_clusters; - size_t max_iterations; +typedef struct { std::vector<DSample> cluster_centers; calculated_number_t min_dist; calculated_number_t max_dist; + + uint32_t after; + uint32_t before; } ml_kmeans_t; typedef struct machine_learning_stats_t { @@ -123,6 +124,7 @@ enum ml_training_result { typedef struct { // Chart/dimension we want to train + STRING *host_id; STRING *chart_id; STRING *dimension_id; @@ -168,6 +170,7 @@ typedef struct { /* * Queue */ + typedef struct { std::queue<ml_training_request_t> internal; netdata_mutex_t mutex; @@ -175,7 +178,6 @@ typedef struct { std::atomic<bool> exit; } ml_queue_t; - typedef struct { RRDDIM *rd; @@ -207,19 +209,12 @@ typedef struct { RRDHOST *rh; ml_machine_learning_stats_t mls; - ml_training_stats_t ts; calculated_number_t host_anomaly_rate; - std::atomic<bool> threads_running; - std::atomic<bool> threads_cancelled; - std::atomic<bool> threads_joined; - - ml_queue_t *training_queue; - netdata_mutex_t mutex; - netdata_thread_t training_thread; + ml_queue_t *training_queue; /* * bookkeeping for anomaly detection charts @@ -249,6 +244,19 @@ typedef struct { RRDSET *detector_events_rs; RRDDIM *detector_events_above_threshold_rd; RRDDIM *detector_events_new_anomaly_event_rd; +} ml_host_t; + +typedef struct { + size_t id; + netdata_thread_t nd_thread; + netdata_mutex_t nd_mutex; + + ml_queue_t *training_queue; + ml_training_stats_t training_stats; + + calculated_number_t *training_cns; + calculated_number_t *scratch_training_cns; + std::vector<DSample> training_samples; RRDSET *queue_stats_rs; RRDDIM *queue_stats_queue_size_rd; @@ -265,7 +273,7 @@ typedef struct { RRDDIM *training_results_not_enough_collected_values_rd; RRDDIM *training_results_null_acquired_dimension_rd; RRDDIM *training_results_chart_under_replication_rd; -} ml_host_t; +} ml_training_thread_t; typedef struct { bool enable_anomaly_detection; @@ -302,6 +310,14 @@ typedef struct { std::vector<uint32_t> random_nums; netdata_thread_t detection_thread; + std::atomic<bool> detection_stop; + + size_t num_training_threads; + + std::vector<ml_training_thread_t> training_threads; + std::atomic<bool> training_stop; + + bool enable_statistics_charts; } ml_config_t; void ml_config_load(ml_config_t *cfg); @@ -7,15 +7,18 @@ #include <random> #include "ad_charts.h" +#include "database/sqlite/sqlite3.h" -typedef struct { - calculated_number_t *training_cns; - calculated_number_t *scratch_training_cns; - - std::vector<DSample> training_samples; -} ml_tls_data_t; +#define WORKER_TRAIN_QUEUE_POP 0 +#define WORKER_TRAIN_ACQUIRE_DIMENSION 1 +#define WORKER_TRAIN_QUERY 2 +#define WORKER_TRAIN_KMEANS 3 +#define WORKER_TRAIN_UPDATE_MODELS 4 +#define WORKER_TRAIN_RELEASE_DIMENSION 5 +#define WORKER_TRAIN_UPDATE_HOST 6 +#define WORKER_TRAIN_LOAD_MODELS 7 -static thread_local ml_tls_data_t tls_data; +static sqlite3 *db = NULL; /* * Functions to convert enums to strings @@ -173,26 +176,26 @@ ml_features_preprocess(ml_features_t *features) */ static void -ml_kmeans_init(ml_kmeans_t *kmeans, size_t num_clusters, size_t max_iterations) +ml_kmeans_init(ml_kmeans_t *kmeans) { - kmeans->num_clusters = num_clusters; - kmeans->max_iterations = max_iterations; - - kmeans->cluster_centers.reserve(kmeans->num_clusters); + kmeans->cluster_centers.reserve(2); kmeans->min_dist = std::numeric_limits<calculated_number_t>::max(); kmeans->max_dist = std::numeric_limits<calculated_number_t>::min(); } static void -ml_kmeans_train(ml_kmeans_t *kmeans, const ml_features_t *features) +ml_kmeans_train(ml_kmeans_t *kmeans, const ml_features_t *features, time_t after, time_t before) { + kmeans->after = (uint32_t) after; + kmeans->before = (uint32_t) before; + kmeans->min_dist = std::numeric_limits<calculated_number_t>::max(); kmeans->max_dist = std::numeric_limits<calculated_number_t>::min(); kmeans->cluster_centers.clear(); - dlib::pick_initial_centers(kmeans->num_clusters, kmeans->cluster_centers, features->preprocessed_features); - dlib::find_clusters_using_kmeans(features->preprocessed_features, kmeans->cluster_centers, kmeans->max_iterations); + dlib::pick_initial_centers(2, kmeans->cluster_centers, features->preprocessed_features); + dlib::find_clusters_using_kmeans(features->preprocessed_features, kmeans->cluster_centers, Cfg.max_kmeans_iters); for (const auto &preprocessed_feature : features->preprocessed_features) { calculated_number_t mean_dist = 0.0; @@ -201,7 +204,7 @@ ml_kmeans_train(ml_kmeans_t *kmeans, const ml_features_t *features) mean_dist += dlib::length(cluster_center - preprocessed_feature); } - mean_dist /= kmeans->num_clusters; + mean_dist /= kmeans->cluster_centers.size(); if (mean_dist < kmeans->min_dist) kmeans->min_dist = mean_dist; @@ -218,7 +221,7 @@ ml_kmeans_anomaly_score(const ml_kmeans_t *kmeans, const DSample &DS) for (const auto &CC: kmeans->cluster_centers) mean_dist += dlib::length(CC - DS); - mean_dist /= kmeans->num_clusters; + mean_dist /= kmeans->cluster_centers.size(); if (kmeans->max_dist == kmeans->min_dist) return 0.0; @@ -264,7 +267,14 @@ ml_queue_pop(ml_queue_t *q) { netdata_mutex_lock(&q->mutex); - ml_training_request_t req = { NULL, NULL, 0, 0, 0 }; + ml_training_request_t req = { + NULL, // host_id + NULL, // chart id + NULL, // dimension id + 0, // current time + 0, // first entry + 0 // last entry + }; while (q->internal.empty()) { pthread_cond_wait(&q->cond_var, &q->mutex); @@ -307,7 +317,7 @@ ml_queue_signal(ml_queue_t *q) */ static std::pair<calculated_number_t *, ml_training_response_t> -ml_dimension_calculated_numbers(ml_dimension_t *dim, const ml_training_request_t &training_request) +ml_dimension_calculated_numbers(ml_training_thread_t *training_thread, ml_dimension_t *dim, const ml_training_request_t &training_request) { ml_training_response_t training_response = {}; @@ -348,7 +358,7 @@ ml_dimension_calculated_numbers(ml_dimension_t *dim, const ml_training_request_t STORAGE_PRIORITY_BEST_EFFORT); size_t idx = 0; - memset(tls_data.training_cns, 0, sizeof(calculated_number_t) * max_n * (Cfg.lag_n + 1)); + memset(training_thread->training_cns, 0, sizeof(calculated_number_t) * max_n * (Cfg.lag_n + 1)); calculated_number_t last_value = std::numeric_limits<calculated_number_t>::quiet_NaN(); while (!storage_engine_query_is_finished(&handle)) { @@ -365,11 +375,11 @@ ml_dimension_calculated_numbers(ml_dimension_t *dim, const ml_training_request_t training_response.db_after_t = timestamp; training_response.db_before_t = timestamp; - tls_data.training_cns[idx] = value; - last_value = tls_data.training_cns[idx]; + training_thread->training_cns[idx] = value; + last_value = training_thread->training_cns[idx]; training_response.collected_values++; } else - tls_data.training_cns[idx] = last_value; + training_thread->training_cns[idx] = last_value; idx++; } @@ -384,20 +394,270 @@ ml_dimension_calculated_numbers(ml_dimension_t *dim, const ml_training_request_t } // Find first non-NaN value. - for (idx = 0; std::isnan(tls_data.training_cns[idx]); idx++, training_response.total_values--) { } + for (idx = 0; std::isnan(training_thread->training_cns[idx]); idx++, training_response.total_values--) { } // Overwrite NaN values. if (idx != 0) - memmove(tls_data.training_cns, &tls_data.training_cns[idx], sizeof(calculated_number_t) * training_response.total_values); + memmove(training_thread->training_cns, &training_thread->training_cns[idx], sizeof(calculated_number_t) * training_response.total_values); training_response.result = TRAINING_RESULT_OK; - return { tls_data.training_cns, training_response }; + return { training_thread->training_cns, training_response }; +} + +const char *db_models_create_table = + "CREATE TABLE IF NOT EXISTS models(" + " dim_id BLOB, dim_str TEXT, after INT, before INT," + " min_dist REAL, max_dist REAL," + " c00 REAL, c01 REAL, c02 REAL, c03 REAL, c04 REAL, c05 REAL," + " c10 REAL, c11 REAL, c12 REAL, c13 REAL, c14 REAL, c15 REAL," + " PRIMARY KEY(dim_id, after)" + ");"; + +const char *db_models_add_model = + "INSERT OR REPLACE INTO models(" + " dim_id, dim_str, after, before," + " min_dist, max_dist," + " c00, c01, c02, c03, c04, c05," + " c10, c11, c12, c13, c14, c15)" + "VALUES(" + " @dim_id, @dim_str, @after, @before," + " @min_dist, @max_dist," + " @c00, @c01, @c02, @c03, @c04, @c05," + " @c10, @c11, @c12, @c13, @c14, @c15);"; + +const char *db_models_load = + "SELECT * FROM models " + "WHERE dim_id == @dim_id AND after >= @after ORDER BY before ASC;"; + +const char *db_models_delete = + "DELETE FROM models " + "WHERE dim_id = @dim_id AND before < @before;"; + +static int +ml_dimension_add_model(ml_dimension_t *dim) +{ + static __thread sqlite3_stmt *res = NULL; + int param = 0; + int rc = 0; + + if (unlikely(!db)) { + error_report("Database has not been initialized"); + return 1; + } + + if (unlikely(!res)) { + rc = prepare_statement(db, db_models_add_model, &res); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to prepare statement to store model, rc = %d", rc); + return 1; + } + } + + rc = sqlite3_bind_blob(res, ++param, &dim->rd->metric_uuid, sizeof(dim->rd->metric_uuid), SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + char id[1024]; + snprintfz(id, 1024 - 1, "%s.%s", rrdset_id(dim->rd->rrdset), rrddim_id(dim->rd)); + rc = sqlite3_bind_text(res, ++param, id, -1, SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = sqlite3_bind_int(res, ++param, (int) dim->kmeans.after); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = sqlite3_bind_int(res, ++param, (int) dim->kmeans.before); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = sqlite3_bind_double(res, ++param, dim->kmeans.min_dist); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = sqlite3_bind_double(res, ++param, dim->kmeans.max_dist); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + if (dim->kmeans.cluster_centers.size() != 2) + fatal("Expected 2 cluster centers, got %zu", dim->kmeans.cluster_centers.size()); + + for (const DSample &ds : dim->kmeans.cluster_centers) { + if (ds.size() != 6) + fatal("Expected dsample with 6 dimensions, got %ld", ds.size()); + + for (long idx = 0; idx != ds.size(); idx++) { + calculated_number_t cn = ds(idx); + int rc = sqlite3_bind_double(res, ++param, cn); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + } + } + + rc = execute_insert(res); + if (unlikely(rc != SQLITE_DONE)) + error_report("Failed to store model, rc = %d", rc); + + rc = sqlite3_reset(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to reset statement when storing model, rc = %d", rc); + + return 0; + +bind_fail: + error_report("Failed to bind parameter %d to store model, rc = %d", param, rc); + rc = sqlite3_reset(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to reset statement to store model, rc = %d", rc); + return 1; +} + +static int +ml_dimension_delete_models(ml_dimension_t *dim) +{ + static __thread sqlite3_stmt *res = NULL; + int rc = 0; + int param = 0; + + if (unlikely(!db)) { + error_report("Database has not been initialized"); + return 1; + } + + if (unlikely(!res)) { + rc = prepare_statement(db, db_models_delete, &res); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to prepare statement to delete models, rc = %d", rc); + return 1; + } + } + + rc = sqlite3_bind_blob(res, ++param, &dim->rd->metric_uuid, sizeof(dim->rd->metric_uuid), SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = sqlite3_bind_int(res, ++param, (int) dim->kmeans.before - (Cfg.num_models_to_use * Cfg.train_every)); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = execute_insert(res); + if (unlikely(rc != SQLITE_DONE)) + error_report("Failed to delete models, rc = %d", rc); + + rc = sqlite3_reset(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to reset statement when deleting models, rc = %d", rc); + + return 0; + +bind_fail: + error_report("Failed to bind parameter %d to delete models, rc = %d", param, rc); + rc = sqlite3_reset(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to reset statement to delete models, rc = %d", rc); + return 1; +} + +static int +ml_dimension_load_models(ml_dimension_t *dim) { + std::vector<ml_kmeans_t> V; + + static __thread sqlite3_stmt *res = NULL; + int rc = 0; + int param = 0; + + if (unlikely(!db)) { + error_report("Database has not been initialized"); + return 1; + } + + if (unlikely(!res)) { + rc = prepare_statement(db, db_models_load, &res); + if (unlikely(rc != SQLITE_OK)) { + error_report("Failed to prepare statement to load models, rc = %d", rc); + return 1; + } + } + + rc = sqlite3_bind_blob(res, ++param, &dim->rd->metric_uuid, sizeof(dim->rd->metric_uuid), SQLITE_STATIC); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + rc = sqlite3_bind_int(res, ++param, now_realtime_usec() - (Cfg.num_models_to_use * Cfg.max_train_samples)); + if (unlikely(rc != SQLITE_OK)) + goto bind_fail; + + dim->km_contexts.reserve(Cfg.num_models_to_use); + while ((rc = sqlite3_step_monitored(res)) == SQLITE_ROW) { + ml_kmeans_t km; + + km.after = sqlite3_column_int(res, 2); + km.before = sqlite3_column_int(res, 3); + + km.min_dist = sqlite3_column_int(res, 4); + km.max_dist = sqlite3_column_int(res, 5); + + km.cluster_centers.resize(2); + + km.cluster_centers[0].set_size(Cfg.lag_n + 1); + km.cluster_centers[0](0) = sqlite3_column_double(res, 6); + km.cluster_centers[0](1) = sqlite3_column_double(res, 7); + km.cluster_centers[0](2) = sqlite3_column_double(res, 8); + km.cluster_centers[0](3) = sqlite3_column_double(res, 9); + km.cluster_centers[0](4) = sqlite3_column_double(res, 10); + km.cluster_centers[0](5) = sqlite3_column_double(res, 11); + + km.cluster_centers[1].set_size(Cfg.lag_n + 1); + km.cluster_centers[1](0) = sqlite3_column_double(res, 12); + km.cluster_centers[1](1) = sqlite3_column_double(res, 13); + km.cluster_centers[1](2) = sqlite3_column_double(res, 14); + km.cluster_centers[1](3) = sqlite3_column_double(res, 15); + km.cluster_centers[1](4) = sqlite3_column_double(res, 16); + km.cluster_centers[1](5) = sqlite3_column_double(res, 17); + + dim->km_contexts.push_back(km); + } + + if (unlikely(rc != SQLITE_DONE)) + error_report("Failed to load models, rc = %d", rc); + + rc = sqlite3_reset(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to reset statement when loading models, rc = %d", rc); + + return 0; + +bind_fail: + error_report("Failed to bind parameter %d to load models, rc = %d", param, rc); + rc = sqlite3_reset(res); + if (unlikely(rc != SQLITE_OK)) + error_report("Failed to reset statement to load models, rc = %d", rc); + return 1; +} + +static int +ml_dimension_update_models(ml_dimension_t *dim) +{ + int rc; + + if (dim->km_contexts.empty()) { + rc = ml_dimension_load_models(dim); + if (rc) + return rc; + } + + rc = ml_dimension_add_model(dim); + if (rc) + return rc; + + return ml_dimension_delete_models(dim); } static enum ml_training_result -ml_dimension_train_model(ml_dimension_t *dim, const ml_training_request_t &training_request) +ml_dimension_train_model(ml_training_thread_t *training_thread, ml_dimension_t *dim, const ml_training_request_t &training_request) { - auto P = ml_dimension_calculated_numbers(dim, training_request); + worker_is_busy(WORKER_TRAIN_QUERY); + auto P = ml_dimension_calculated_numbers(training_thread, dim, training_request); ml_training_response_t training_response = P.second; i |