summaryrefslogtreecommitdiffstats
path: root/ml
diff options
context:
space:
mode:
Diffstat (limited to 'ml')
-rw-r--r--ml/Config.cc6
-rw-r--r--ml/ad_charts.cc6
-rw-r--r--ml/ad_charts.h8
m---------ml/json0
-rw-r--r--ml/ml-dummy.c79
-rw-r--r--ml/ml-private.h (renamed from ml/nml.h)101
-rw-r--r--ml/ml.cc1262
-rw-r--r--ml/ml.h33
-rw-r--r--ml/nml.cc1134
9 files changed, 1265 insertions, 1364 deletions
diff --git a/ml/Config.cc b/ml/Config.cc
index 8a31f2bd6b..8b04590d77 100644
--- a/ml/Config.cc
+++ b/ml/Config.cc
@@ -1,12 +1,12 @@
// SPDX-License-Identifier: GPL-3.0-or-later
-#include "nml.h"
+#include "ml-private.h"
/*
* Global configuration instance to be shared between training and
* prediction threads.
*/
-nml_config_t Cfg;
+ml_config_t Cfg;
template <typename T>
static T clamp(const T& Value, const T& Min, const T& Max) {
@@ -16,7 +16,7 @@ static T clamp(const T& Value, const T& Min, const T& Max) {
/*
* Initialize global configuration variable.
*/
-void nml_config_load(nml_config_t *cfg) {
+void ml_config_load(ml_config_t *cfg) {
const char *config_section_ml = CONFIG_SECTION_ML;
bool enable_anomaly_detection = config_get_boolean(config_section_ml, "enabled", true);
diff --git a/ml/ad_charts.cc b/ml/ad_charts.cc
index d1607bf70f..a32ff6c650 100644
--- a/ml/ad_charts.cc
+++ b/ml/ad_charts.cc
@@ -2,7 +2,7 @@
#include "ad_charts.h"
-void nml_update_dimensions_chart(nml_host_t *host, const nml_machine_learning_stats_t &mls) {
+void ml_update_dimensions_chart(ml_host_t *host, const ml_machine_learning_stats_t &mls) {
/*
* Machine learning status
*/
@@ -182,7 +182,7 @@ void nml_update_dimensions_chart(nml_host_t *host, const nml_machine_learning_st
}
-void nml_update_host_and_detection_rate_charts(nml_host_t *host, collected_number AnomalyRate) {
+void ml_update_host_and_detection_rate_charts(ml_host_t *host, collected_number AnomalyRate) {
/*
* Anomaly rate
*/
@@ -301,7 +301,7 @@ void nml_update_host_and_detection_rate_charts(nml_host_t *host, collected_numbe
}
}
-void nml_update_training_statistics_chart(nml_host_t *host, const nml_training_stats_t &ts) {
+void ml_update_training_statistics_chart(ml_host_t *host, const ml_training_stats_t &ts) {
/*
* queue stats
*/
diff --git a/ml/ad_charts.h b/ml/ad_charts.h
index 24d7cc3a58..a973b44a51 100644
--- a/ml/ad_charts.h
+++ b/ml/ad_charts.h
@@ -3,12 +3,12 @@
#ifndef ML_ADCHARTS_H
#define ML_ADCHARTS_H
-#include "nml.h"
+#include "ml-private.h"
-void nml_update_dimensions_chart(nml_host_t *host, const nml_machine_learning_stats_t &mls);
+void ml_update_dimensions_chart(ml_host_t *host, const ml_machine_learning_stats_t &mls);
-void nml_update_host_and_detection_rate_charts(nml_host_t *host, collected_number anomaly_rate);
+void ml_update_host_and_detection_rate_charts(ml_host_t *host, collected_number anomaly_rate);
-void nml_update_training_statistics_chart(nml_host_t *host, const nml_training_stats_t &ts);
+void ml_update_training_statistics_chart(ml_host_t *host, const ml_training_stats_t &ts);
#endif /* ML_ADCHARTS_H */
diff --git a/ml/json b/ml/json
deleted file mode 160000
-Subproject 0b345b20c888f7dc8888485768e4bf9a6be29de
diff --git a/ml/ml-dummy.c b/ml/ml-dummy.c
index dfbadcd5a3..53444e246f 100644
--- a/ml/ml-dummy.c
+++ b/ml/ml-dummy.c
@@ -8,78 +8,81 @@ bool ml_capable() {
return false;
}
-bool ml_enabled(RRDHOST *RH) {
- (void) RH;
+bool ml_enabled(RRDHOST *rh) {
+ UNUSED(rh);
+ return false;
+}
+
+bool ml_streaming_enabled() {
return false;
}
void ml_init(void) {}
-void ml_host_new(RRDHOST *RH) {
- UNUSED(RH);
+void ml_host_new(RRDHOST *rh) {
+ UNUSED(rh);
}
-void ml_host_delete(RRDHOST *RH) {
- UNUSED(RH);
+void ml_host_delete(RRDHOST *rh) {
+ UNUSED(rh);
}
-void ml_chart_new(RRDSET *RS) {
- UNUSED(RS);
+void ml_host_start_training_thread(RRDHOST *rh) {
+ UNUSED(rh);
}
-void ml_chart_delete(RRDSET *RS) {
- UNUSED(RS);
+void ml_host_stop_training_thread(RRDHOST *rh) {
+ UNUSED(rh);
}
-void ml_dimension_new(RRDDIM *RD) {
- UNUSED(RD);
+void ml_host_cancel_training_thread(RRDHOST *rh) {
+ UNUSED(rh);
}
-void ml_dimension_delete(RRDDIM *RD) {
- UNUSED(RD);
+void ml_host_get_info(RRDHOST *rh, BUFFER *wb) {
+ UNUSED(rh);
+ UNUSED(wb);
}
-void ml_start_training_thread(RRDHOST *RH) {
- UNUSED(RH);
+void ml_host_get_models(RRDHOST *rh, BUFFER *wb) {
+ UNUSED(rh);
+ UNUSED(wb);
}
-void ml_stop_training_thread(RRDHOST *RH) {
- UNUSED(RH);
+void ml_host_get_runtime_info(RRDHOST *rh) {
+ UNUSED(rh);
}
-void ml_get_host_info(RRDHOST *RH, BUFFER *wb) {
- (void) RH;
- (void) wb;
+void ml_chart_new(RRDSET *rs) {
+ UNUSED(rs);
}
-char *ml_get_host_runtime_info(RRDHOST *RH) {
- (void) RH;
- return NULL;
+void ml_chart_delete(RRDSET *rs) {
+ UNUSED(rs);
}
-bool ml_chart_update_begin(RRDSET *RS) {
- (void) RS;
+bool ml_chart_update_begin(RRDSET *rs) {
+ UNUSED(rs);
return false;
}
-void ml_chart_update_end(RRDSET *RS) {
- (void) RS;
+void ml_chart_update_end(RRDSET *rs) {
+ UNUSED(rs);
}
-char *ml_get_host_models(RRDHOST *RH) {
- (void) RH;
- return NULL;
+void ml_dimension_new(RRDDIM *rd) {
+ UNUSED(rd);
}
-bool ml_is_anomalous(RRDDIM *RD, time_t CurrT, double Value, bool Exists) {
- (void) RD;
- (void) CurrT;
- (void) Value;
- (void) Exists;
- return false;
+void ml_dimension_delete(RRDDIM *rd) {
+ UNUSED(rd);
}
-bool ml_streaming_enabled() {
+bool ml_dimension_is_anomalous(RRDDIM *rd, time_t curr_time, double value, bool exists) {
+ UNUSED(rd);
+ UNUSED(curr_time);
+ UNUSED(value);
+ UNUSED(exists);
return false;
}
diff --git a/ml/nml.h b/ml/ml-private.h
index e7363087c5..173b82e265 100644
--- a/ml/nml.h
+++ b/ml/ml-private.h
@@ -1,7 +1,7 @@
// SPDX-License-Identifier: GPL-3.0-or-later
-#ifndef NETDATA_NML_H
-#define NETDATA_NML_H
+#ifndef NETDATA_ML_PRIVATE_H
+#define NETDATA_ML_PRIVATE_H
#include "dlib/matrix.h"
#include "ml/ml.h"
@@ -28,7 +28,7 @@ typedef struct {
size_t src_n;
std::vector<DSample> &preprocessed_features;
-} nml_features_t;
+} ml_features_t;
/*
* KMeans
@@ -41,9 +41,7 @@ typedef struct {
calculated_number_t min_dist;
calculated_number_t max_dist;
-} nml_kmeans_t;
-
-#include "json/single_include/nlohmann/json.hpp"
+} ml_kmeans_t;
typedef struct machine_learning_stats_t {
size_t num_machine_learning_status_enabled;
@@ -59,11 +57,9 @@ typedef struct machine_learning_stats_t {
size_t num_anomalous_dimensions;
size_t num_normal_dimensions;
-} nml_machine_learning_stats_t;
+} ml_machine_learning_stats_t;
typedef struct training_stats_t {
- struct rusage training_ru;
-
size_t queue_size;
size_t num_popped_items;
@@ -76,9 +72,9 @@ typedef struct training_stats_t {
size_t training_result_not_enough_collected_values;
size_t training_result_null_acquired_dimension;
size_t training_result_chart_under_replication;
-} nml_training_stats_t;
+} ml_training_stats_t;
-enum nml_metric_type {
+enum ml_metric_type {
// The dimension has constant values, no need to train
METRIC_TYPE_CONSTANT,
@@ -86,7 +82,7 @@ enum nml_metric_type {
METRIC_TYPE_VARIABLE,
};
-enum nml_machine_learning_status {
+enum ml_machine_learning_status {
// Enable training/prediction
MACHINE_LEARNING_STATUS_ENABLED,
@@ -94,7 +90,7 @@ enum nml_machine_learning_status {
MACHINE_LEARNING_STATUS_DISABLED_DUE_TO_EXCLUDED_CHART,
};
-enum nml_training_status {
+enum ml_training_status {
// We don't have a model for this dimension
TRAINING_STATUS_UNTRAINED,
@@ -108,7 +104,7 @@ enum nml_training_status {
TRAINING_STATUS_TRAINED,
};
-enum nml_training_result {
+enum ml_training_result {
// We managed to create a KMeans model
TRAINING_RESULT_OK,
@@ -137,7 +133,7 @@ typedef struct {
// at the point the request was made
time_t first_entry_on_request;
time_t last_entry_on_request;
-} nml_training_request_t;
+} ml_training_request_t;
typedef struct {
// Time when the request for this response was made
@@ -166,71 +162,52 @@ typedef struct {
size_t total_values;
// Result of training response
- enum nml_training_result result;
-} nml_training_response_t;
+ enum ml_training_result result;
+} ml_training_response_t;
/*
* Queue
*/
-
typedef struct {
- std::queue<nml_training_request_t> internal;
+ std::queue<ml_training_request_t> internal;
netdata_mutex_t mutex;
pthread_cond_t cond_var;
std::atomic<bool> exit;
-} nml_queue_t;
+} ml_queue_t;
-nml_queue_t *nml_queue_init(void);
-void nml_queue_destroy(nml_queue_t *q);
-
-void nml_queue_push(nml_queue_t *q, const nml_training_request_t req);
-nml_training_request_t nml_queue_pop(nml_queue_t *q);
-size_t nml_queue_size(nml_queue_t *q);
-
-void nml_queue_signal(nml_queue_t *q);
typedef struct {
RRDDIM *rd;
- enum nml_metric_type mt;
- enum nml_training_status ts;
- enum nml_machine_learning_status mls;
+ enum ml_metric_type mt;
+ enum ml_training_status ts;
+ enum ml_machine_learning_status mls;
- nml_training_response_t tr;
+ ml_training_response_t tr;
time_t last_training_time;
std::vector<calculated_number_t> cns;
- std::vector<nml_kmeans_t> km_contexts;
+ std::vector<ml_kmeans_t> km_contexts;
netdata_mutex_t mutex;
- nml_kmeans_t kmeans;
+ ml_kmeans_t kmeans;
std::vector<DSample> feature;
-} nml_dimension_t;
-
-nml_dimension_t *nml_dimension_new(RRDDIM *rd);
-void nml_dimension_delete(nml_dimension_t *dim);
-
-bool nml_dimension_predict(nml_dimension_t *d, time_t curr_t, calculated_number_t value, bool exists);
+} ml_dimension_t;
typedef struct {
RRDSET *rs;
- nml_machine_learning_stats_t mls;
+ ml_machine_learning_stats_t mls;
netdata_mutex_t mutex;
-} nml_chart_t;
-
-nml_chart_t *nml_chart_new(RRDSET *rs);
-void nml_chart_delete(nml_chart_t *chart);
+} ml_chart_t;
-void nml_chart_update_begin(nml_chart_t *chart);
-void nml_chart_update_end(nml_chart_t *chart);
-void nml_chart_update_dimension(nml_chart_t *chart, nml_dimension_t *dim, bool is_anomalous);
+void ml_chart_update_dimension(ml_chart_t *chart, ml_dimension_t *dim, bool is_anomalous);
typedef struct {
RRDHOST *rh;
- nml_machine_learning_stats_t mls;
- nml_training_stats_t ts;
+ ml_machine_learning_stats_t mls;
+ ml_training_stats_t ts;
calculated_number_t host_anomaly_rate;
@@ -238,7 +215,7 @@ typedef struct {
std::atomic<bool> threads_cancelled;
std::atomic<bool> threads_joined;
- nml_queue_t *training_queue;
+ ml_queue_t *training_queue;
netdata_mutex_t mutex;
@@ -288,17 +265,7 @@ typedef struct {
RRDDIM *training_results_not_enough_collected_values_rd;
RRDDIM *training_results_null_acquired_dimension_rd;
RRDDIM *training_results_chart_under_replication_rd;
-} nml_host_t;
-
-nml_host_t *nml_host_new(RRDHOST *rh);
-void nml_host_delete(nml_host_t *host);
-
-void nml_host_start_training_thread(nml_host_t *host);
-void nml_host_stop_training_thread(nml_host_t *host, bool join);
-
-void nml_host_get_config_as_json(nml_host_t *host, BUFFER *wb);
-void nml_host_get_models_as_json(nml_host_t *host, nlohmann::json &j);
-void nml_host_get_detection_info_as_json(nml_host_t *host, nlohmann::json &j);
+} ml_host_t;
typedef struct {
bool enable_anomaly_detection;
@@ -335,12 +302,10 @@ typedef struct {
std::vector<uint32_t> random_nums;
netdata_thread_t detection_thread;
-} nml_config_t;
-
-void nml_config_load(nml_config_t *cfg);
+} ml_config_t;
-void *nml_detect_main(void *arg);
+void ml_config_load(ml_config_t *cfg);
-extern nml_config_t Cfg;
+extern ml_config_t Cfg;
-#endif /* NETDATA_NML_H */
+#endif /* NETDATA_ML_PRIVATE_H */
diff --git a/ml/ml.cc b/ml/ml.cc
index 43b49aa5de..cf9ea379a6 100644
--- a/ml/ml.cc
+++ b/ml/ml.cc
@@ -1,14 +1,969 @@
// SPDX-License-Identifier: GPL-3.0-or-later
-#include "nml.h"
+#include <dlib/clustering.h>
+
+#include "ml-private.h"
#include <random>
-bool ml_capable() {
+#include "ad_charts.h"
+
+typedef struct {
+ calculated_number_t *training_cns;
+ calculated_number_t *scratch_training_cns;
+
+ std::vector<DSample> training_samples;
+} ml_tls_data_t;
+
+static thread_local ml_tls_data_t tls_data;
+
+/*
+ * Functions to convert enums to strings
+*/
+
+__attribute__((unused)) static const char *
+ml_machine_learning_status_to_string(enum ml_machine_learning_status mls)
+{
+ switch (mls) {
+ case MACHINE_LEARNING_STATUS_ENABLED:
+ return "enabled";
+ case MACHINE_LEARNING_STATUS_DISABLED_DUE_TO_EXCLUDED_CHART:
+ return "disabled-sp";
+ default:
+ return "unknown";
+ }
+}
+
+__attribute__((unused)) static const char *
+ml_metric_type_to_string(enum ml_metric_type mt)
+{
+ switch (mt) {
+ case METRIC_TYPE_CONSTANT:
+ return "constant";
+ case METRIC_TYPE_VARIABLE:
+ return "variable";
+ default:
+ return "unknown";
+ }
+}
+
+__attribute__((unused)) static const char *
+ml_training_status_to_string(enum ml_training_status ts)
+{
+ switch (ts) {
+ case TRAINING_STATUS_PENDING_WITH_MODEL:
+ return "pending-with-model";
+ case TRAINING_STATUS_PENDING_WITHOUT_MODEL:
+ return "pending-without-model";
+ case TRAINING_STATUS_TRAINED:
+ return "trained";
+ case TRAINING_STATUS_UNTRAINED:
+ return "untrained";
+ default:
+ return "unknown";
+ }
+}
+
+__attribute__((unused)) static const char *
+ml_training_result_to_string(enum ml_training_result tr)
+{
+ switch (tr) {
+ case TRAINING_RESULT_OK:
+ return "ok";
+ case TRAINING_RESULT_INVALID_QUERY_TIME_RANGE:
+ return "invalid-query";
+ case TRAINING_RESULT_NOT_ENOUGH_COLLECTED_VALUES:
+ return "missing-values";
+ case TRAINING_RESULT_NULL_ACQUIRED_DIMENSION:
+ return "null-acquired-dim";
+ case TRAINING_RESULT_CHART_UNDER_REPLICATION:
+ return "chart-under-replication";
+ default:
+ return "unknown";
+ }
+}
+
+/*
+ * Features
+*/
+
+// subtract elements that are `diff_n` positions apart
+static void
+ml_features_diff(ml_features_t *features)
+{
+ if (features->diff_n == 0)
+ return;
+
+ for (size_t idx = 0; idx != (features->src_n - features->diff_n); idx++) {
+ size_t high = (features->src_n - 1) - idx;
+ size_t low = high - features->diff_n;
+
+ features->dst[low] = features->src[high] - features->src[low];
+ }
+
+ size_t n = features->src_n - features->diff_n;
+ memcpy(features->src, features->dst, n * sizeof(calculated_number_t));
+
+ for (size_t idx = features->src_n - features->diff_n; idx != features->src_n; idx++)
+ features->src[idx] = 0.0;
+}
+
+// a function that computes the window average of an array inplace
+static void
+ml_features_smooth(ml_features_t *features)
+{
+ calculated_number_t sum = 0.0;
+
+ size_t idx = 0;
+ for (; idx != features->smooth_n - 1; idx++)
+ sum += features->src[idx];
+
+ for (; idx != (features->src_n - features->diff_n); idx++) {
+ sum += features->src[idx];
+ calculated_number_t prev_cn = features->src[idx - (features->smooth_n - 1)];
+ features->src[idx - (features->smooth_n - 1)] = sum / features->smooth_n;
+ sum -= prev_cn;
+ }
+
+ for (idx = 0; idx != features->smooth_n; idx++)
+ features->src[(features->src_n - 1) - idx] = 0.0;
+}
+
+// create lag'd vectors out of the preprocessed buffer
+static void
+ml_features_lag(ml_features_t *features)
+{
+ size_t n = features->src_n - features->diff_n - features->smooth_n + 1 - features->lag_n;
+ features->preprocessed_features.resize(n);
+
+ unsigned target_num_samples = Cfg.max_train_samples * Cfg.random_sampling_ratio;
+ double sampling_ratio = std::min(static_cast<double>(target_num_samples) / n, 1.0);
+
+ uint32_t max_mt = std::numeric_limits<uint32_t>::max();
+ uint32_t cutoff = static_cast<double>(max_mt) * sampling_ratio;
+
+ size_t sample_idx = 0;
+
+ for (size_t idx = 0; idx != n; idx++) {
+ DSample &DS = features->preprocessed_features[sample_idx++];
+ DS.set_size(features->lag_n);
+
+ if (Cfg.random_nums[idx] > cutoff) {
+ sample_idx--;
+ continue;
+ }
+
+ for (size_t feature_idx = 0; feature_idx != features->lag_n + 1; feature_idx++)
+ DS(feature_idx) = features->src[idx + feature_idx];
+ }
+
+ features->preprocessed_features.resize(sample_idx);
+}
+
+static void
+ml_features_preprocess(ml_features_t *features)
+{
+ ml_features_diff(features);
+ ml_features_smooth(features);
+ ml_features_lag(features);
+}
+
+/*
+ * KMeans
+*/
+
+static void
+ml_kmeans_init(ml_kmeans_t *kmeans, size_t num_clusters, size_t max_iterations)
+{
+ kmeans->num_clusters = num_clusters;
+ kmeans->max_iterations = max_iterations;
+
+ kmeans->cluster_centers.reserve(kmeans->num_clusters);
+ kmeans->min_dist = std::numeric_limits<calculated_number_t>::max();
+ kmeans->max_dist = std::numeric_limits<calculated_number_t>::min();
+}
+
+static void
+ml_kmeans_train(ml_kmeans_t *kmeans, const ml_features_t *features)
+{
+ kmeans->min_dist = std::numeric_limits<calculated_number_t>::max();
+ kmeans->max_dist = std::numeric_limits<calculated_number_t>::min();
+
+ kmeans->cluster_centers.clear();
+
+ dlib::pick_initial_centers(kmeans->num_clusters, kmeans->cluster_centers, features->preprocessed_features);
+ dlib::find_clusters_using_kmeans(features->preprocessed_features, kmeans->cluster_centers, kmeans->max_iterations);
+
+ for (const auto &preprocessed_feature : features->preprocessed_features) {
+ calculated_number_t mean_dist = 0.0;
+
+ for (const auto &cluster_center : kmeans->cluster_centers) {
+ mean_dist += dlib::length(cluster_center - preprocessed_feature);
+ }
+
+ mean_dist /= kmeans->num_clusters;
+
+ if (mean_dist < kmeans->min_dist)
+ kmeans->min_dist = mean_dist;
+
+ if (mean_dist > kmeans->max_dist)
+ kmeans->max_dist = mean_dist;
+ }
+}
+
+static calculated_number_t
+ml_kmeans_anomaly_score(const ml_kmeans_t *kmeans, const DSample &DS)
+{
+ calculated_number_t mean_dist = 0.0;
+ for (const auto &CC: kmeans->cluster_centers)
+ mean_dist += dlib::length(CC - DS);
+
+ mean_dist /= kmeans->num_clusters;
+
+ if (kmeans->max_dist == kmeans->min_dist)
+ return 0.0;
+
+ calculated_number_t anomaly_score = 100.0 * std::abs((mean_dist - kmeans->min_dist) / (kmeans->max_dist - kmeans->min_dist));
+ return (anomaly_score > 100.0) ? 100.0 : anomaly_score;
+}
+
+/*
+ * Queue
+*/
+
+static ml_queue_t *
+ml_queue_init()
+{
+ ml_queue_t *q = new ml_queue_t();
+
+ netdata_mutex_init(&q->mutex);
+ pthread_cond_init(&q->cond_var, NULL);
+ q->exit = false;
+ return q;
+}
+
+static void
+ml_queue_destroy(ml_queue_t *q)
+{
+ netdata_mutex_destroy(&q->mutex);
+ pthread_cond_destroy(&q->cond_var);
+ delete q;
+}
+
+static void
+ml_queue_push(ml_queue_t *q, const ml_training_request_t req)
+{
+ netdata_mutex_lock(&q->mutex);
+ q->internal.push(req);
+ pthread_cond_signal(&q->cond_var);
+ netdata_mutex_unlock(&q->mutex);
+}
+
+static ml_training_request_t
+ml_queue_pop(ml_queue_t *q)
+{
+ netdata_mutex_lock(&q->mutex);
+
+ ml_training_request_t req = { NULL, NULL, 0, 0, 0 };
+
+ while (q->internal.empty()) {
+ pthread_cond_wait(&q->cond_var, &q->mutex);
+
+ if (q->exit) {
+ netdata_mutex_unlock(&q->mutex);
+
+ // We return a dummy request because the queue has been signaled
+ return req;
+ }
+ }
+
+ req = q->internal.front();
+ q->internal.pop();
+
+ netdata_mutex_unlock(&q->mutex);
+ return req;
+}
+
+static size_t
+ml_queue_size(ml_queue_t *q)
+{
+ netdata_mutex_lock(&q->mutex);
+ size_t size = q->internal.size();
+ netdata_mutex_unlock(&q->mutex);
+ return size;
+}
+
+static void
+ml_queue_signal(ml_queue_t *q)
+{
+ netdata_mutex_lock(&q->mutex);
+ q->exit = true;
+ pthread_cond_signal(&q->cond_var);
+ netdata_mutex_unlock(&q->mutex);
+}
+
+/*
+ * Dimension
+*/
+
+static std::pair<calculated_number_t *, ml_training_response_t>
+ml_dimension_calculated_numbers(ml_dimension_t *dim, const ml_training_request_t &training_request)
+{
+ ml_training_response_t training_response = {};
+
+ training_response.request_time = training_request.request_time;
+ training_response.first_entry_on_request = training_request.first_entry_on_request;
+ training_response.last_entry_on_request = training_request.last_entry_on_request;
+
+ training_response.first_entry_on_response = rrddim_first_entry_s_of_tier(dim->rd, 0);
+ training_response.last_entry_on_response = rrddim_last_entry_s_of_tier(dim->rd, 0);
+
+ size_t min_n = Cfg.min_train_samples;
+ size_t max_n = Cfg.max_train_samples;
+
+ // Figure out what our time window should be.
+ training_response.query_before_t = training_response.last_entry_on_response;
+ training_response.query_after_t = std::max(
+ training_response.query_before_t - static_cast<time_t>((max_n - 1) * dim->rd->update_every),
+ training_response.first_entry_on_response
+ );
+
+ if (training_response.query_after_t >= training_response.query_before_t) {
+ training_response.result = TRAINING_RESULT_INVALID_QUERY_TIME_RANGE;
+ return { NULL, training_response };
+ }
+
+ if (rrdset_is_replicating(dim->rd->rrdset)) {
+ training_response.result = TRAINING_RESULT_CHART_UNDER_REPLICATION;
+ return { NULL, training_response };
+ }
+
+ /*
+ * Execute the query
+ */
+ struct storage_engine_query_ops *ops = dim->rd->tiers[0].query_ops;
+ struct storage_engine_query_handle handle;
+
+ ops->init(dim->rd->tiers[0].db_metric_handle,
+ &handle,
+ training_response.query_after_t,
+ training_response.query_before_t,
+ STORAGE_PRIORITY_BEST_EFFORT);
+
+ size_t idx = 0;
+ memset(tls_data.training_cns, 0, sizeof(calculated_number_t) * max_n * (Cfg.lag_n + 1));
+ calculated_number_t last_value = std::numeric_limits<calculated_number_t>::quiet_NaN();
+
+ while (!ops->is_finished(&handle)) {
+ if (idx == max_n)
+ break;
+
+ STORAGE_POINT sp = ops->next_metric(&handle);
+
+ time_t timestamp = sp.end_time_s;
+ calculated_number_t value = sp.sum / sp.count;
+
+ if (netdata_double_isnumber(value)) {
+ if (!training_response.db_after_t)
+ training_response.db_after_t = timestamp;
+ training_response.db_before_t = timestamp;
+
+ tls_data.training_cns[idx] = value;
+ last_value = tls_data.training_cns[idx];
+ training_response.collected_values++;
+ } else
+ tls_data.training_cns[idx] = last_value;
+
+ idx++;
+ }
+ ops->finalize(&handle);
+
+ global_statistics_ml_query_completed(/* points_read */ idx);
+
+ training_response.total_values = idx;
+ if (training_response.collected_values < min_n) {
+ training_response.result = TRAINING_RESULT_NOT_ENOUGH_COLLECTED_VALUES;
+ return { NULL, training_response };
+ }
+
+ // Find first non-NaN value.
+ for (idx = 0; std::isnan(tls_data.training_cns[idx]); idx++, training_response.total_values--) { }
+
+ // Overwrite NaN values.
+ if (idx != 0)
+ memmove(tls_data.training_cns, &tls_data.training_cns[idx], sizeof(calculated_number_t) * training_response.total_values);
+
+ training_response.result = TRAINING_RESULT_OK;
+ return { tls_data.training_cns, training_response };
+}
+
+static enum ml_training_result
+ml_dimension_train_model(ml_dimension_t *dim, const ml_training_request_t &training_request)
+{
+ auto P = ml_dimension_calculated_numbers(dim, training_request);
+ ml_training_response_t training_response = P.second;
+
+ if (training_response.result != TRAINING_RESULT_OK) {
+ netdata_mutex_lock(&dim->mutex);
+
+ dim->mt = METRIC_TYPE_CONSTANT;
+
+ switch (dim->ts) {
+ case TRAINING_STATUS_PENDING_WITH_MODEL:
+ dim->ts = TRAINING_STATUS_TRAINED;
+ break;
+ case TRAINING_STATUS_PENDING_WITHOUT_MODEL:
+ dim->ts = TRAINING_STATUS_UNTRAINED;
+ break;
+ default:
+ break;
+ }
+
+ dim->tr = training_response;
+
+ dim->last_training_time = training_response.last_entry_on_response;
+ enum ml_training_result result = training_response.result;
+ netdata_mutex_unlock(&dim->mutex);
+
+ return result;
+ }
+
+ // compute kmeans
+ {
+ memcpy(tls_data.scratch_training_cns, tls_data.training_cns,
+ training_response.total_values * sizeof(calculated_number_t));
+
+ ml_features_t features = {
+ Cfg.diff_n, Cfg.smooth_n, Cfg.lag_n,
+ tls_data.scratch_training_cns, training_response.total_values,
+ tls_data.training_cns, training_response.total_values,
+ tls_data.training_samples
+ };
+ ml_features_preprocess(&features);
+
+ ml_kmeans_init(&dim->kmeans, 2, 1000);
+ ml_kmeans_train(&dim->kmeans, &features);
+ }
+
+ // update kmeans models
+ {
+ netdata_mutex_lock(&dim->mutex);
+
+ if (dim->km_contexts.size() < Cfg.num_models_to_use) {
+ dim->km_contexts.push_back(std::move(dim->kmeans));
+ } else {
+ std::rotate(std::begin(dim->km_contexts), std::begin(dim->km_contexts) + 1, std::end(dim->km_contexts));
+ dim->km_contexts[dim->km_contexts.size() - 1] = std::move(dim->kmeans);
+ }
+
+ dim->mt = METRIC_TYPE_CONSTANT;
+ dim->ts = TRAINING_STATUS_TRAINED;
+ dim->tr = training_response;
+ dim->last_training_time = rrddim_last_entry_s(dim->rd);
+
+ netdata_mutex_unlock(&dim->mutex);
+ }
+
+ return training_response.result;
+}
+
+static void
+ml_dimension_schedule_for_training(ml_dimension_t *dim, time_t curr_time)
+{
+ switch (dim->mt) {
+ case METRIC_TYPE_CONSTANT:
+ return;
+ default:
+ break;
+ }
+
+ bool schedule_for_training = false;
+
+ switch (dim->ts) {
+ case TRAINING_STATUS_PENDING_WITH_MODEL:
+ case TRAINING_STATUS_PENDING_WITHOUT_MODEL:
+ schedule_for_training = false;
+ break;
+ case TRAINING_STATUS_UNTRAINED:
+ schedule_for_training = true;
+ dim->ts = TRAINING_STATUS_PENDING_WITHOUT_MODEL;
+ break;
+ case TRAINING_STATUS_TRAINED:
+ if ((dim->last_training_time + (Cfg.train_every * dim->rd->update_every)) < curr_time) {
+ schedule_for_training = true;
+ dim->ts = TRAINING_STATUS_PENDING_WITH_MODEL;
+ }
+ break;
+ }
+
+ if (schedule_for_training) {
+ ml_host_t *host = (ml_host_t *) dim->rd->rrdset->rrdhost->ml_host;
+ ml_training_request_t req = {
+ string_dup(dim->rd->rrdset->id), string_dup(dim->rd->id),
+ curr_time, rrddim_first_entry_s(dim->rd), rrddim_last_entry_s(dim->rd),
+ };
+ ml_queue_push(host->training_queue, req);
+ }
+}
+
+static bool
+ml_dimension_predict(ml_dimension_t *dim, time_t curr_time, calculated_number_t value, bool exists)
+{
+ // Nothing to do if ML is disabled for this dimension
+ if (dim->mls != MACHINE_LEARNING_STATUS_ENABLED)
+ return false;
+
+ // Don't treat values that don't exist as anomalous
+ if (!exists) {
+ dim->cns.clear();
+ return false;
+ }
+
+ // Save the value and return if we don't have enough values for a sample
+ unsigned n = Cfg.diff_n + Cfg.smooth_n + Cfg.lag_n;
+ if (dim->cns.size() < n) {
+ dim->cns.push_back(value);
+ return false;
+ }
+
+ // Push the value and check if it's different from the last one
+ bool same_value = true;
+ std::rotate(std::begin(dim->cns), std::begin(dim->cns) + 1, std::end(dim->cns));
+ if (dim->cns[n - 1] != value)
+ same_value = false;
+ dim->cns[n - 1] = value;
+
+ // Create the sample
+ assert((n * (Cfg.lag_n + 1) <= 128) &&
+ "Static buffers too small to perform prediction. "
+ "This should not be possible with the default clamping of feature extraction options");
+ calculated_number_t src_cns[128];
+ calculated_number_t dst_cns[128];
+
+ memset(src_cns, 0, n * (Cfg.lag_n + 1) * sizeof(calculated_number_t));
+ memcpy(src_cns, dim->cns.data(), n * sizeof(calculated_number_t));
+ memcpy(dst_cns, dim->cns.data(), n * sizeof(calculated_number_t));
+
+ ml_features_t features = {
+ Cfg.diff_n, Cfg.smooth_n, Cfg.lag_n,
+ dst_cns, n, src_cns, n,
+ dim->feature
+ };
+ ml_features_preprocess(&features);
+
+ /*
+ * Lock to predict and possibly schedule the dimension for training
+ */
+ if (netdata_mutex_trylock(&dim->mutex) != 0)
+ return false;
+
+ // Mark the metric time as variable if we received different values
+ if (!same_value)
+ dim->mt = METRIC_TYPE_VARIABLE;
+
+ // Decide if the dimension needs to be scheduled for training
+ ml_d