summaryrefslogtreecommitdiffstats
path: root/ml
diff options
context:
space:
mode:
authorvkalintiris <vasilis@netdata.cloud>2023-04-13 20:29:52 +0300
committerGitHub <noreply@github.com>2023-04-13 20:29:52 +0300
commit003df5f2b76973f898b44742b7e071ff2654343a (patch)
tree46183f6a35edb887ab8f3de8a1d6e398438a6a0f /ml
parent40f69dc20f7e40b7155d29a3f735ec4af29f4865 (diff)
Save and load ML models (#14810)
* Revert "Revert "Use static thread-pool for training. (#14702)" (#14782)" This reverts commit 5321ca8d1ef8d974a6a2b2128ca8804de6acb693. * Model I/O. * Minor changes Meant to make debugging a crash issues easier on cloud VMs: - Less verbose logging - Higher logging history - Modify installer to use debug info by default * Fix ML initialization order. * read lock hosts when running detection. * Revert debugging changes. * Update ml/Config.cc Co-authored-by: Andrew Maguire <andrewm4894@gmail.com> --------- Co-authored-by: Andrew Maguire <andrewm4894@gmail.com>
Diffstat (limited to 'ml')
-rw-r--r--ml/Config.cc12
-rw-r--r--ml/ad_charts.cc167
-rw-r--r--ml/ad_charts.h2
-rw-r--r--ml/ml-dummy.c10
-rw-r--r--ml/ml-private.h42
-rw-r--r--ml/ml.cc892
-rw-r--r--ml/ml.h11
7 files changed, 755 insertions, 381 deletions
diff --git a/ml/Config.cc b/ml/Config.cc
index 8b04590d77..415d11b838 100644
--- a/ml/Config.cc
+++ b/ml/Config.cc
@@ -34,7 +34,7 @@ void ml_config_load(ml_config_t *cfg) {
unsigned smooth_n = config_get_number(config_section_ml, "num samples to smooth", 3);
unsigned lag_n = config_get_number(config_section_ml, "num samples to lag", 5);
- double random_sampling_ratio = config_get_float(config_section_ml, "random sampling ratio", 1.0 / lag_n);
+ double random_sampling_ratio = config_get_float(config_section_ml, "random sampling ratio", 1.0 / 5.0 /* default lag_n */);
unsigned max_kmeans_iters = config_get_number(config_section_ml, "maximum number of k-means iterations", 1000);
double dimension_anomaly_rate_threshold = config_get_float(config_section_ml, "dimension anomaly score threshold", 0.99);
@@ -43,6 +43,10 @@ void ml_config_load(ml_config_t *cfg) {
std::string anomaly_detection_grouping_method = config_get(config_section_ml, "anomaly detection grouping method", "average");
time_t anomaly_detection_query_duration = config_get_number(config_section_ml, "anomaly detection grouping duration", 5 * 60);
+ size_t num_training_threads = config_get_number(config_section_ml, "num training threads", 4);
+
+ bool enable_statistics_charts = config_get_boolean(config_section_ml, "enable statistics charts", false);
+
/*
* Clamp
*/
@@ -64,6 +68,8 @@ void ml_config_load(ml_config_t *cfg) {
host_anomaly_rate_threshold = clamp(host_anomaly_rate_threshold, 0.1, 10.0);
anomaly_detection_query_duration = clamp<time_t>(anomaly_detection_query_duration, 60, 15 * 60);
+ num_training_threads = clamp<size_t>(num_training_threads, 1, 128);
+
/*
* Validate
*/
@@ -109,4 +115,8 @@ void ml_config_load(ml_config_t *cfg) {
cfg->sp_charts_to_skip = simple_pattern_create(cfg->charts_to_skip.c_str(), NULL, SIMPLE_PATTERN_EXACT, true);
cfg->stream_anomaly_detection_charts = config_get_boolean(config_section_ml, "stream anomaly detection charts", true);
+
+ cfg->num_training_threads = num_training_threads;
+
+ cfg->enable_statistics_charts = enable_statistics_charts;
}
diff --git a/ml/ad_charts.cc b/ml/ad_charts.cc
index a32ff6c650..086cd5aa02 100644
--- a/ml/ad_charts.cc
+++ b/ml/ad_charts.cc
@@ -6,7 +6,7 @@ void ml_update_dimensions_chart(ml_host_t *host, const ml_machine_learning_stats
/*
* Machine learning status
*/
- {
+ if (Cfg.enable_statistics_charts) {
if (!host->machine_learning_status_rs) {
char id_buf[1024];
char name_buf[1024];
@@ -48,7 +48,7 @@ void ml_update_dimensions_chart(ml_host_t *host, const ml_machine_learning_stats
/*
* Metric type
*/
- {
+ if (Cfg.enable_statistics_charts) {
if (!host->metric_type_rs) {
char id_buf[1024];
char name_buf[1024];
@@ -90,7 +90,7 @@ void ml_update_dimensions_chart(ml_host_t *host, const ml_machine_learning_stats
/*
* Training status
*/
- {
+ if (Cfg.enable_statistics_charts) {
if (!host->training_status_rs) {
char id_buf[1024];
char name_buf[1024];
@@ -179,7 +179,6 @@ void ml_update_dimensions_chart(ml_host_t *host, const ml_machine_learning_stats
rrdset_done(host->dimensions_rs);
}
-
}
void ml_update_host_and_detection_rate_charts(ml_host_t *host, collected_number AnomalyRate) {
@@ -301,20 +300,20 @@ void ml_update_host_and_detection_rate_charts(ml_host_t *host, collected_number
}
}
-void ml_update_training_statistics_chart(ml_host_t *host, const ml_training_stats_t &ts) {
+void ml_update_training_statistics_chart(ml_training_thread_t *training_thread, const ml_training_stats_t &ts) {
/*
* queue stats
*/
{
- if (!host->queue_stats_rs) {
+ if (!training_thread->queue_stats_rs) {
char id_buf[1024];
char name_buf[1024];
- snprintfz(id_buf, 1024, "queue_stats_on_%s", localhost->machine_guid);
- snprintfz(name_buf, 1024, "queue_stats_on_%s", rrdhost_hostname(localhost));
+ snprintfz(id_buf, 1024, "training_queue_%zu_stats", training_thread->id);
+ snprintfz(name_buf, 1024, "training_queue_%zu_stats", training_thread->id);
- host->queue_stats_rs = rrdset_create(
- host->rh,
+ training_thread->queue_stats_rs = rrdset_create(
+ localhost,
"netdata", // type
id_buf, // id
name_buf, // name
@@ -328,35 +327,35 @@ void ml_update_training_statistics_chart(ml_host_t *host, const ml_training_stat
localhost->rrd_update_every, // update_every
RRDSET_TYPE_LINE// chart_type
);
- rrdset_flag_set(host->queue_stats_rs, RRDSET_FLAG_ANOMALY_DETECTION);
+ rrdset_flag_set(training_thread->queue_stats_rs, RRDSET_FLAG_ANOMALY_DETECTION);
- host->queue_stats_queue_size_rd =
- rrddim_add(host->queue_stats_rs, "queue_size", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
- host->queue_stats_popped_items_rd =
- rrddim_add(host->queue_stats_rs, "popped_items", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ training_thread->queue_stats_queue_size_rd =
+ rrddim_add(training_thread->queue_stats_rs, "queue_size", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ training_thread->queue_stats_popped_items_rd =
+ rrddim_add(training_thread->queue_stats_rs, "popped_items", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
}
- rrddim_set_by_pointer(host->queue_stats_rs,
- host->queue_stats_queue_size_rd, ts.queue_size);
- rrddim_set_by_pointer(host->queue_stats_rs,
- host->queue_stats_popped_items_rd, ts.num_popped_items);
+ rrddim_set_by_pointer(training_thread->queue_stats_rs,
+ training_thread->queue_stats_queue_size_rd, ts.queue_size);
+ rrddim_set_by_pointer(training_thread->queue_stats_rs,
+ training_thread->queue_stats_popped_items_rd, ts.num_popped_items);
- rrdset_done(host->queue_stats_rs);
+ rrdset_done(training_thread->queue_stats_rs);
}
/*
* training stats
*/
{
- if (!host->training_time_stats_rs) {
+ if (!training_thread->training_time_stats_rs) {
char id_buf[1024];
char name_buf[1024];
- snprintfz(id_buf, 1024, "training_time_stats_on_%s", localhost->machine_guid);
- snprintfz(name_buf, 1024, "training_time_stats_on_%s", rrdhost_hostname(localhost));
+ snprintfz(id_buf, 1024, "training_queue_%zu_time_stats", training_thread->id);
+ snprintfz(name_buf, 1024, "training_queue_%zu_time_stats", training_thread->id);
- host->training_time_stats_rs = rrdset_create(
- host->rh,
+ training_thread->training_time_stats_rs = rrdset_create(
+ localhost,
"netdata", // type
id_buf, // id
name_buf, // name
@@ -370,39 +369,39 @@ void ml_update_training_statistics_chart(ml_host_t *host, const ml_training_stat
localhost->rrd_update_every, // update_every
RRDSET_TYPE_LINE// chart_type
);
- rrdset_flag_set(host->training_time_stats_rs, RRDSET_FLAG_ANOMALY_DETECTION);
-
- host->training_time_stats_allotted_rd =
- rrddim_add(host->training_time_stats_rs, "allotted", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE);
- host->training_time_stats_consumed_rd =
- rrddim_add(host->training_time_stats_rs, "consumed", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE);
- host->training_time_stats_remaining_rd =
- rrddim_add(host->training_time_stats_rs, "remaining", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE);
+ rrdset_flag_set(training_thread->training_time_stats_rs, RRDSET_FLAG_ANOMALY_DETECTION);
+
+ training_thread->training_time_stats_allotted_rd =
+ rrddim_add(training_thread->training_time_stats_rs, "allotted", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE);
+ training_thread->training_time_stats_consumed_rd =
+ rrddim_add(training_thread->training_time_stats_rs, "consumed", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE);
+ training_thread->training_time_stats_remaining_rd =
+ rrddim_add(training_thread->training_time_stats_rs, "remaining", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE);
}
- rrddim_set_by_pointer(host->training_time_stats_rs,
- host->training_time_stats_allotted_rd, ts.allotted_ut);
- rrddim_set_by_pointer(host->training_time_stats_rs,
- host->training_time_stats_consumed_rd, ts.consumed_ut);
- rrddim_set_by_pointer(host->training_time_stats_rs,
- host->training_time_stats_remaining_rd, ts.remaining_ut);
+ rrddim_set_by_pointer(training_thread->training_time_stats_rs,
+ training_thread->training_time_stats_allotted_rd, ts.allotted_ut);
+ rrddim_set_by_pointer(training_thread->training_time_stats_rs,
+ training_thread->training_time_stats_consumed_rd, ts.consumed_ut);
+ rrddim_set_by_pointer(training_thread->training_time_stats_rs,
+ training_thread->training_time_stats_remaining_rd, ts.remaining_ut);
- rrdset_done(host->training_time_stats_rs);
+ rrdset_done(training_thread->training_time_stats_rs);
}
/*
* training result stats
*/
{
- if (!host->training_results_rs) {
+ if (!training_thread->training_results_rs) {
char id_buf[1024];
char name_buf[1024];
- snprintfz(id_buf, 1024, "training_results_on_%s", localhost->machine_guid);
- snprintfz(name_buf, 1024, "training_results_on_%s", rrdhost_hostname(localhost));
+ snprintfz(id_buf, 1024, "training_queue_%zu_results", training_thread->id);
+ snprintfz(name_buf, 1024, "training_queue_%zu_results", training_thread->id);
- host->training_results_rs = rrdset_create(
- host->rh,
+ training_thread->training_results_rs = rrdset_create(
+ localhost,
"netdata", // type
id_buf, // id
name_buf, // name
@@ -416,31 +415,61 @@ void ml_update_training_statistics_chart(ml_host_t *host, const ml_training_stat
localhost->rrd_update_every, // update_every
RRDSET_TYPE_LINE// chart_type
);
- rrdset_flag_set(host->training_results_rs, RRDSET_FLAG_ANOMALY_DETECTION);
-
- host->training_results_ok_rd =
- rrddim_add(host->training_results_rs, "ok", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
- host->training_results_invalid_query_time_range_rd =
- rrddim_add(host->training_results_rs, "invalid-queries", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
- host->training_results_not_enough_collected_values_rd =
- rrddim_add(host->training_results_rs, "not-enough-values", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
- host->training_results_null_acquired_dimension_rd =
- rrddim_add(host->training_results_rs, "null-acquired-dimensions", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
- host->training_results_chart_under_replication_rd =
- rrddim_add(host->training_results_rs, "chart-under-replication", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ rrdset_flag_set(training_thread->training_results_rs, RRDSET_FLAG_ANOMALY_DETECTION);
+
+ training_thread->training_results_ok_rd =
+ rrddim_add(training_thread->training_results_rs, "ok", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ training_thread->training_results_invalid_query_time_range_rd =
+ rrddim_add(training_thread->training_results_rs, "invalid-queries", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ training_thread->training_results_not_enough_collected_values_rd =
+ rrddim_add(training_thread->training_results_rs, "not-enough-values", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ training_thread->training_results_null_acquired_dimension_rd =
+ rrddim_add(training_thread->training_results_rs, "null-acquired-dimensions", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ training_thread->training_results_chart_under_replication_rd =
+ rrddim_add(training_thread->training_results_rs, "chart-under-replication", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
}
- rrddim_set_by_pointer(host->training_results_rs,
- host->training_results_ok_rd, ts.training_result_ok);
- rrddim_set_by_pointer(host->training_results_rs,
- host->training_results_invalid_query_time_range_rd, ts.training_result_invalid_query_time_range);
- rrddim_set_by_pointer(host->training_results_rs,
- host->training_results_not_enough_collected_values_rd, ts.training_result_not_enough_collected_values);
- rrddim_set_by_pointer(host->training_results_rs,
- host->training_results_null_acquired_dimension_rd, ts.training_result_null_acquired_dimension);
- rrddim_set_by_pointer(host->training_results_rs,
- host->training_results_chart_under_replication_rd, ts.training_result_chart_under_replication);
-
- rrdset_done(host->training_results_rs);
+ rrddim_set_by_pointer(training_thread->training_results_rs,
+ training_thread->training_results_ok_rd, ts.training_result_ok);
+ rrddim_set_by_pointer(training_thread->training_results_rs,
+ training_thread->training_results_invalid_query_time_range_rd, ts.training_result_invalid_query_time_range);
+ rrddim_set_by_pointer(training_thread->training_results_rs,
+ training_thread->training_results_not_enough_collected_values_rd, ts.training_result_not_enough_collected_values);
+ rrddim_set_by_pointer(training_thread->training_results_rs,
+ training_thread->training_results_null_acquired_dimension_rd, ts.training_result_null_acquired_dimension);
+ rrddim_set_by_pointer(training_thread->training_results_rs,
+ training_thread->training_results_chart_under_replication_rd, ts.training_result_chart_under_replication);
+
+ rrdset_done(training_thread->training_results_rs);
+ }
+}
+
+void ml_update_global_statistics_charts(uint64_t models_consulted) {
+ if (Cfg.enable_statistics_charts) {
+ static RRDSET *st = NULL;
+ static RRDDIM *rd = NULL;
+
+ if (unlikely(!st)) {
+ st = rrdset_create_localhost(
+ "netdata" // type
+ , "ml_models_consulted" // id
+ , NULL // name
+ , NETDATA_ML_CHART_FAMILY // family
+ , NULL // context
+ , "KMeans models used for prediction" // title
+ , "models" // units
+ , NETDATA_ML_PLUGIN // plugin
+ , NETDATA_ML_MODULE_DETECTION // module
+ , NETDATA_ML_CHART_PRIO_MACHINE_LEARNING_STATUS // priority
+ , localhost->rrd_update_every // update_every
+ , RRDSET_TYPE_AREA // chart_type
+ );
+
+ rd = rrddim_add(st, "num_models_consulted", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
+ }
+
+ rrddim_set_by_pointer(st, rd, (collected_number) models_consulted);
+
+ rrdset_done(st);
}
}
diff --git a/ml/ad_charts.h b/ml/ad_charts.h
index a973b44a51..349b369a24 100644
--- a/ml/ad_charts.h
+++ b/ml/ad_charts.h
@@ -9,6 +9,6 @@ void ml_update_dimensions_chart(ml_host_t *host, const ml_machine_learning_stats
void ml_update_host_and_detection_rate_charts(ml_host_t *host, collected_number anomaly_rate);
-void ml_update_training_statistics_chart(ml_host_t *host, const ml_training_stats_t &ts);
+void ml_update_training_statistics_chart(ml_training_thread_t *training_thread, const ml_training_stats_t &ts);
#endif /* ML_ADCHARTS_H */
diff --git a/ml/ml-dummy.c b/ml/ml-dummy.c
index 53444e246f..6ea0818c68 100644
--- a/ml/ml-dummy.c
+++ b/ml/ml-dummy.c
@@ -19,6 +19,12 @@ bool ml_streaming_enabled() {
void ml_init(void) {}
+void ml_fini(void) {}
+
+void ml_start_threads(void) {}
+
+void ml_stop_threads(void) {}
+
void ml_host_new(RRDHOST *rh) {
UNUSED(rh);
}
@@ -86,4 +92,8 @@ bool ml_dimension_is_anomalous(RRDDIM *rd, time_t curr_time, double value, bool
return false;
}
+void ml_update_global_statistics_charts(uint64_t models_consulted) {
+ UNUSED(models_consulted);
+}
+
#endif
diff --git a/ml/ml-private.h b/ml/ml-private.h
index 173b82e265..d014c71d26 100644
--- a/ml/ml-private.h
+++ b/ml/ml-private.h
@@ -33,14 +33,15 @@ typedef struct {
/*
* KMeans
*/
-typedef struct {
- size_t num_clusters;
- size_t max_iterations;
+typedef struct {
std::vector<DSample> cluster_centers;
calculated_number_t min_dist;
calculated_number_t max_dist;
+
+ uint32_t after;
+ uint32_t before;
} ml_kmeans_t;
typedef struct machine_learning_stats_t {
@@ -123,6 +124,7 @@ enum ml_training_result {
typedef struct {
// Chart/dimension we want to train
+ STRING *host_id;
STRING *chart_id;
STRING *dimension_id;
@@ -168,6 +170,7 @@ typedef struct {
/*
* Queue
*/
+
typedef struct {
std::queue<ml_training_request_t> internal;
netdata_mutex_t mutex;
@@ -175,7 +178,6 @@ typedef struct {
std::atomic<bool> exit;
} ml_queue_t;
-
typedef struct {
RRDDIM *rd;
@@ -207,19 +209,12 @@ typedef struct {
RRDHOST *rh;
ml_machine_learning_stats_t mls;
- ml_training_stats_t ts;
calculated_number_t host_anomaly_rate;
- std::atomic<bool> threads_running;
- std::atomic<bool> threads_cancelled;
- std::atomic<bool> threads_joined;
-
- ml_queue_t *training_queue;
-
netdata_mutex_t mutex;
- netdata_thread_t training_thread;
+ ml_queue_t *training_queue;
/*
* bookkeeping for anomaly detection charts
@@ -249,6 +244,19 @@ typedef struct {
RRDSET *detector_events_rs;
RRDDIM *detector_events_above_threshold_rd;
RRDDIM *detector_events_new_anomaly_event_rd;
+} ml_host_t;
+
+typedef struct {
+ size_t id;
+ netdata_thread_t nd_thread;
+ netdata_mutex_t nd_mutex;
+
+ ml_queue_t *training_queue;
+ ml_training_stats_t training_stats;
+
+ calculated_number_t *training_cns;
+ calculated_number_t *scratch_training_cns;
+ std::vector<DSample> training_samples;
RRDSET *queue_stats_rs;
RRDDIM *queue_stats_queue_size_rd;
@@ -265,7 +273,7 @@ typedef struct {
RRDDIM *training_results_not_enough_collected_values_rd;
RRDDIM *training_results_null_acquired_dimension_rd;
RRDDIM *training_results_chart_under_replication_rd;
-} ml_host_t;
+} ml_training_thread_t;
typedef struct {
bool enable_anomaly_detection;
@@ -302,6 +310,14 @@ typedef struct {
std::vector<uint32_t> random_nums;
netdata_thread_t detection_thread;
+ std::atomic<bool> detection_stop;
+
+ size_t num_training_threads;
+
+ std::vector<ml_training_thread_t> training_threads;
+ std::atomic<bool> training_stop;
+
+ bool enable_statistics_charts;
} ml_config_t;
void ml_config_load(ml_config_t *cfg);
diff --git a/ml/ml.cc b/ml/ml.cc
index b5cf6d661d..7daff86469 100644
--- a/ml/ml.cc
+++ b/ml/ml.cc
@@ -7,15 +7,18 @@
#include <random>
#include "ad_charts.h"
+#include "database/sqlite/sqlite3.h"
-typedef struct {
- calculated_number_t *training_cns;
- calculated_number_t *scratch_training_cns;
-
- std::vector<DSample> training_samples;
-} ml_tls_data_t;
+#define WORKER_TRAIN_QUEUE_POP 0
+#define WORKER_TRAIN_ACQUIRE_DIMENSION 1
+#define WORKER_TRAIN_QUERY 2
+#define WORKER_TRAIN_KMEANS 3
+#define WORKER_TRAIN_UPDATE_MODELS 4
+#define WORKER_TRAIN_RELEASE_DIMENSION 5
+#define WORKER_TRAIN_UPDATE_HOST 6
+#define WORKER_TRAIN_LOAD_MODELS 7
-static thread_local ml_tls_data_t tls_data;
+static sqlite3 *db = NULL;
/*
* Functions to convert enums to strings
@@ -173,26 +176,26 @@ ml_features_preprocess(ml_features_t *features)
*/
static void
-ml_kmeans_init(ml_kmeans_t *kmeans, size_t num_clusters, size_t max_iterations)
+ml_kmeans_init(ml_kmeans_t *kmeans)
{
- kmeans->num_clusters = num_clusters;
- kmeans->max_iterations = max_iterations;
-
- kmeans->cluster_centers.reserve(kmeans->num_clusters);
+ kmeans->cluster_centers.reserve(2);
kmeans->min_dist = std::numeric_limits<calculated_number_t>::max();
kmeans->max_dist = std::numeric_limits<calculated_number_t>::min();
}
static void
-ml_kmeans_train(ml_kmeans_t *kmeans, const ml_features_t *features)
+ml_kmeans_train(ml_kmeans_t *kmeans, const ml_features_t *features, time_t after, time_t before)
{
+ kmeans->after = (uint32_t) after;
+ kmeans->before = (uint32_t) before;
+
kmeans->min_dist = std::numeric_limits<calculated_number_t>::max();
kmeans->max_dist = std::numeric_limits<calculated_number_t>::min();
kmeans->cluster_centers.clear();
- dlib::pick_initial_centers(kmeans->num_clusters, kmeans->cluster_centers, features->preprocessed_features);
- dlib::find_clusters_using_kmeans(features->preprocessed_features, kmeans->cluster_centers, kmeans->max_iterations);
+ dlib::pick_initial_centers(2, kmeans->cluster_centers, features->preprocessed_features);
+ dlib::find_clusters_using_kmeans(features->preprocessed_features, kmeans->cluster_centers, Cfg.max_kmeans_iters);
for (const auto &preprocessed_feature : features->preprocessed_features) {
calculated_number_t mean_dist = 0.0;
@@ -201,7 +204,7 @@ ml_kmeans_train(ml_kmeans_t *kmeans, const ml_features_t *features)
mean_dist += dlib::length(cluster_center - preprocessed_feature);
}
- mean_dist /= kmeans->num_clusters;
+ mean_dist /= kmeans->cluster_centers.size();
if (mean_dist < kmeans->min_dist)
kmeans->min_dist = mean_dist;
@@ -218,7 +221,7 @@ ml_kmeans_anomaly_score(const ml_kmeans_t *kmeans, const DSample &DS)
for (const auto &CC: kmeans->cluster_centers)
mean_dist += dlib::length(CC - DS);
- mean_dist /= kmeans->num_clusters;
+ mean_dist /= kmeans->cluster_centers.size();
if (kmeans->max_dist == kmeans->min_dist)
return 0.0;
@@ -264,7 +267,14 @@ ml_queue_pop(ml_queue_t *q)
{
netdata_mutex_lock(&q->mutex);
- ml_training_request_t req = { NULL, NULL, 0, 0, 0 };
+ ml_training_request_t req = {
+ NULL, // host_id
+ NULL, // chart id
+ NULL, // dimension id
+ 0, // current time
+ 0, // first entry
+ 0 // last entry
+ };
while (q->internal.empty()) {
pthread_cond_wait(&q->cond_var, &q->mutex);
@@ -307,7 +317,7 @@ ml_queue_signal(ml_queue_t *q)
*/
static std::pair<calculated_number_t *, ml_training_response_t>
-ml_dimension_calculated_numbers(ml_dimension_t *dim, const ml_training_request_t &training_request)
+ml_dimension_calculated_numbers(ml_training_thread_t *training_thread, ml_dimension_t *dim, const ml_training_request_t &training_request)
{
ml_training_response_t training_response = {};
@@ -348,7 +358,7 @@ ml_dimension_calculated_numbers(ml_dimension_t *dim, const ml_training_request_t
STORAGE_PRIORITY_BEST_EFFORT);
size_t idx = 0;
- memset(tls_data.training_cns, 0, sizeof(calculated_number_t) * max_n * (Cfg.lag_n + 1));
+ memset(training_thread->training_cns, 0, sizeof(calculated_number_t) * max_n * (Cfg.lag_n + 1));
calculated_number_t last_value = std::numeric_limits<calculated_number_t>::quiet_NaN();
while (!storage_engine_query_is_finished(&handle)) {
@@ -365,11 +375,11 @@ ml_dimension_calculated_numbers(ml_dimension_t *dim, const ml_training_request_t
training_response.db_after_t = timestamp;
training_response.db_before_t = timestamp;
- tls_data.training_cns[idx] = value;
- last_value = tls_data.training_cns[idx];
+ training_thread->training_cns[idx] = value;
+ last_value = training_thread->training_cns[idx];
training_response.collected_values++;
} else
- tls_data.training_cns[idx] = last_value;
+ training_thread->training_cns[idx] = last_value;
idx++;
}
@@ -384,20 +394,270 @@ ml_dimension_calculated_numbers(ml_dimension_t *dim, const ml_training_request_t
}
// Find first non-NaN value.
- for (idx = 0; std::isnan(tls_data.training_cns[idx]); idx++, training_response.total_values--) { }
+ for (idx = 0; std::isnan(training_thread->training_cns[idx]); idx++, training_response.total_values--) { }
// Overwrite NaN values.
if (idx != 0)
- memmove(tls_data.training_cns, &tls_data.training_cns[idx], sizeof(calculated_number_t) * training_response.total_values);
+ memmove(training_thread->training_cns, &training_thread->training_cns[idx], sizeof(calculated_number_t) * training_response.total_values);
training_response.result = TRAINING_RESULT_OK;
- return { tls_data.training_cns, training_response };
+ return { training_thread->training_cns, training_response };
+}
+
+const char *db_models_create_table =
+ "CREATE TABLE IF NOT EXISTS models("
+ " dim_id BLOB, dim_str TEXT, after INT, before INT,"
+ " min_dist REAL, max_dist REAL,"
+ " c00 REAL, c01 REAL, c02 REAL, c03 REAL, c04 REAL, c05 REAL,"
+ " c10 REAL, c11 REAL, c12 REAL, c13 REAL, c14 REAL, c15 REAL,"
+ " PRIMARY KEY(dim_id, after)"
+ ");";
+
+const char *db_models_add_model =
+ "INSERT OR REPLACE INTO models("
+ " dim_id, dim_str, after, before,"
+ " min_dist, max_dist,"
+ " c00, c01, c02, c03, c04, c05,"
+ " c10, c11, c12, c13, c14, c15)"
+ "VALUES("
+ " @dim_id, @dim_str, @after, @before,"
+ " @min_dist, @max_dist,"
+ " @c00, @c01, @c02, @c03, @c04, @c05,"
+ " @c10, @c11, @c12, @c13, @c14, @c15);";
+
+const char *db_models_load =
+ "SELECT * FROM models "
+ "WHERE dim_id == @dim_id AND after >= @after ORDER BY before ASC;";
+
+const char *db_models_delete =
+ "DELETE FROM models "
+ "WHERE dim_id = @dim_id AND before < @before;";
+
+static int
+ml_dimension_add_model(ml_dimension_t *dim)
+{
+ static __thread sqlite3_stmt *res = NULL;
+ int param = 0;
+ int rc = 0;
+
+ if (unlikely(!db)) {
+ error_report("Database has not been initialized");
+ return 1;
+ }
+
+ if (unlikely(!res)) {
+ rc = prepare_statement(db, db_models_add_model, &res);
+ if (unlikely(rc != SQLITE_OK)) {
+ error_report("Failed to prepare statement to store model, rc = %d", rc);
+ return 1;
+ }
+ }
+
+ rc = sqlite3_bind_blob(res, ++param, &dim->rd->metric_uuid, sizeof(dim->rd->metric_uuid), SQLITE_STATIC);
+ if (unlikely(rc != SQLITE_OK))
+ goto bind_fail;
+
+ char id[1024];
+ snprintfz(id, 1024 - 1, "%s.%s", rrdset_id(dim->rd->rrdset), rrddim_id(dim->rd));
+ rc = sqlite3_bind_text(res, ++param, id, -1, SQLITE_STATIC);
+ if (unlikely(rc != SQLITE_OK))
+ goto bind_fail;
+
+ rc = sqlite3_bind_int(res, ++param, (int) dim->kmeans.after);
+ if (unlikely(rc != SQLITE_OK))
+ goto bind_fail;
+
+ rc = sqlite3_bind_int(res, ++param, (int) dim->kmeans.before);
+ if (unlikely(rc != SQLITE_OK))
+ goto bind_fail;
+
+ rc = sqlite3_bind_double(res, ++param, dim->kmeans.min_dist);
+ if (unlikely(rc != SQLITE_OK))
+ goto bind_fail;
+
+ rc = sqlite3_bind_double(res, ++param, dim->kmeans.max_dist);
+ if (unlikely(rc != SQLITE_OK))
+ goto bind_fail;
+
+ if (dim->kmeans.cluster_centers.size() != 2)
+ fatal("Expected 2 cluster centers, got %zu", dim->kmeans.cluster_centers.size());
+
+ for (const DSample &ds : dim->kmeans.cluster_centers) {
+ if (ds.size() != 6)
+ fatal("Expected dsample with 6 dimensions, got %ld", ds.size());
+
+ for (long idx = 0; idx != ds.size(); idx++) {
+ calculated_number_t cn = ds(idx);
+ int rc = sqlite3_bind_double(res, ++param, cn);
+ if (unlikely(rc != SQLITE_OK))
+ goto bind_fail;
+ }
+ }
+
+ rc = execute_insert(res);
+ if (unlikely(rc != SQLITE_DONE))
+ error_report("Failed to store model, rc = %d", rc);
+
+ rc = sqlite3_reset(res);
+ if (unlikely(rc != SQLITE_OK))
+ error_report("Failed to reset statement when storing model, rc = %d", rc);
+
+ return 0;
+
+bind_fail:
+ error_report("Failed to bind parameter %d to store model, rc = %d", param, rc);
+ rc = sqlite3_reset(res);
+ if (unlikely(rc != SQLITE_OK))
+ error_report("Failed to reset statement to store model, rc = %d", rc);
+ return 1;
+}
+
+static int
+ml_dimension_delete_models(ml_dimension_t *dim)
+{
+ static __thread sqlite3_stmt *res = NULL;
+ int rc = 0;
+ int param = 0;
+
+ if (unlikely(!db)) {
+ error_report("Database has not been initialized");
+ return 1;
+ }
+
+ if (unlikely(!res)) {
+ rc = prepare_statement(db, db_models_delete, &res);
+ if (unlikely(rc != SQLITE_OK)) {
+ error_report("Failed to prepare statement to delete models, rc = %d", rc);
+ return 1;
+ }
+ }
+
+ rc = sqlite3_bind_blob(res, ++param, &dim->rd->metric_uuid, sizeof(dim->rd->metric_uuid), SQLITE_STATIC);
+ if (unlikely(rc != SQLITE_OK))
+ goto bind_fail;
+
+ rc = sqlite3_bind_int(res, ++param, (int) dim->kmeans.before - (Cfg.num_models_to_use * Cfg.train_every));
+ if (unlikely(rc != SQLITE_OK))
+ goto bind_fail;
+
+ rc = execute_insert(res);
+ if (unlikely(rc != SQLITE_DONE))
+ error_report("Failed to delete models, rc = %d", rc);
+
+ rc = sqlite3_reset(res);
+ if (unlikely(rc != SQLITE_OK))
+ error_report("Failed to reset statement when deleting models, rc = %d", rc);
+
+ return 0;
+
+bind_fail:
+ error_report("Failed to bind parameter %d to delete models, rc = %d", param, rc);
+ rc = sqlite3_reset(res);
+ if (unlikely(rc != SQLITE_OK))
+ error_report("Failed to reset statement to delete models, rc = %d", rc);
+ return 1;
+}
+
+static int
+ml_dimension_load_models(ml_dimension_t *dim) {
+ std::vector<ml_kmeans_t> V;
+
+ static __thread sqlite3_stmt *res = NULL;
+ int rc = 0;
+ int param = 0;
+
+ if (unlikely(!db)) {
+ error_report("Database has not been initialized");
+ return 1;
+ }
+
+ if (unlikely(!res)) {
+ rc = prepare_statement(db, db_models_load, &res);
+ if (unlikely(rc != SQLITE_OK)) {
+ error_report("Failed to prepare statement to load models, rc = %d", rc);
+ return 1;
+ }
+ }
+
+ rc = sqlite3_bind_blob(res, ++param, &dim->rd->metric_uuid, sizeof(dim->rd->metric_uuid), SQLITE_STATIC);
+ if (unlikely(rc != SQLITE_OK))
+ goto bind_fail;
+
+ rc = sqlite3_bind_int(res, ++param, now_realtime_usec() - (Cfg.num_models_to_use * Cfg.max_train_samples));
+ if (unlikely(rc != SQLITE_OK))
+ goto bind_fail;
+
+ dim->km_contexts.reserve(Cfg.num_models_to_use);
+ while ((rc = sqlite3_step_monitored(res)) == SQLITE_ROW) {
+ ml_kmeans_t km;
+
+ km.after = sqlite3_column_int(res, 2);
+ km.before = sqlite3_column_int(res, 3);
+
+ km.min_dist = sqlite3_column_int(res, 4);
+ km.max_dist = sqlite3_column_int(res, 5);
+
+ km.cluster_centers.resize(2);
+
+ km.cluster_centers[0].set_size(Cfg.lag_n + 1);
+ km.cluster_centers[0](0) = sqlite3_column_double(res, 6);
+ km.cluster_centers[0](1) = sqlite3_column_double(res, 7);
+ km.cluster_centers[0](2) = sqlite3_column_double(res, 8);
+ km.cluster_centers[0](3) = sqlite3_column_double(res, 9);
+ km.cluster_centers[0](4) = sqlite3_column_double(res, 10);
<