summaryrefslogtreecommitdiffstats
path: root/ml
diff options
context:
space:
mode:
authorvkalintiris <vasilis@netdata.cloud>2023-03-10 12:20:40 +0200
committerGitHub <noreply@github.com>2023-03-10 12:20:40 +0200
commit37a06960f90c046f21c125c2b4265713da04f851 (patch)
tree54ed00a16dffb010a10242507cadb7140b28975b /ml
parenta0652435f0dea126b6603a2467d912ca8a677e36 (diff)
Refactor ML code. (#14659)
* Refactor ML code. This commit introduces only non-functional changes. Originally, the C++ code exposed C functions to be called from the rest of the agent. When we migrated from C++ to C, we did not eliminate these wrapper functions to make the PR easier to understand and keep the total LOC low. This commit removes the wrapper functions and "reclaims" the `ml_` prefix that we used for the public API of the old implementation. Also, the nlohmann Json library has been removed and its functionality was replaced with the equivalent Json functionality that we added in libnetdata's BUFFERs. * Remove missing headers from build systems. * Fix CMake build. * rrddim_free is outside of rrd "internals" now.
Diffstat (limited to 'ml')
-rw-r--r--ml/Config.cc6
-rw-r--r--ml/ad_charts.cc6
-rw-r--r--ml/ad_charts.h8
m---------ml/json0
-rw-r--r--ml/ml-dummy.c79
-rw-r--r--ml/ml-private.h (renamed from ml/nml.h)101
-rw-r--r--ml/ml.cc1262
-rw-r--r--ml/ml.h33
-rw-r--r--ml/nml.cc1134
9 files changed, 1265 insertions, 1364 deletions
diff --git a/ml/Config.cc b/ml/Config.cc
index 8a31f2bd6b..8b04590d77 100644
--- a/ml/Config.cc
+++ b/ml/Config.cc
@@ -1,12 +1,12 @@
// SPDX-License-Identifier: GPL-3.0-or-later
-#include "nml.h"
+#include "ml-private.h"
/*
* Global configuration instance to be shared between training and
* prediction threads.
*/
-nml_config_t Cfg;
+ml_config_t Cfg;
template <typename T>
static T clamp(const T& Value, const T& Min, const T& Max) {
@@ -16,7 +16,7 @@ static T clamp(const T& Value, const T& Min, const T& Max) {
/*
* Initialize global configuration variable.
*/
-void nml_config_load(nml_config_t *cfg) {
+void ml_config_load(ml_config_t *cfg) {
const char *config_section_ml = CONFIG_SECTION_ML;
bool enable_anomaly_detection = config_get_boolean(config_section_ml, "enabled", true);
diff --git a/ml/ad_charts.cc b/ml/ad_charts.cc
index d1607bf70f..a32ff6c650 100644
--- a/ml/ad_charts.cc
+++ b/ml/ad_charts.cc
@@ -2,7 +2,7 @@
#include "ad_charts.h"
-void nml_update_dimensions_chart(nml_host_t *host, const nml_machine_learning_stats_t &mls) {
+void ml_update_dimensions_chart(ml_host_t *host, const ml_machine_learning_stats_t &mls) {
/*
* Machine learning status
*/
@@ -182,7 +182,7 @@ void nml_update_dimensions_chart(nml_host_t *host, const nml_machine_learning_st
}
-void nml_update_host_and_detection_rate_charts(nml_host_t *host, collected_number AnomalyRate) {
+void ml_update_host_and_detection_rate_charts(ml_host_t *host, collected_number AnomalyRate) {
/*
* Anomaly rate
*/
@@ -301,7 +301,7 @@ void nml_update_host_and_detection_rate_charts(nml_host_t *host, collected_numbe
}
}
-void nml_update_training_statistics_chart(nml_host_t *host, const nml_training_stats_t &ts) {
+void ml_update_training_statistics_chart(ml_host_t *host, const ml_training_stats_t &ts) {
/*
* queue stats
*/
diff --git a/ml/ad_charts.h b/ml/ad_charts.h
index 24d7cc3a58..a973b44a51 100644
--- a/ml/ad_charts.h
+++ b/ml/ad_charts.h
@@ -3,12 +3,12 @@
#ifndef ML_ADCHARTS_H
#define ML_ADCHARTS_H
-#include "nml.h"
+#include "ml-private.h"
-void nml_update_dimensions_chart(nml_host_t *host, const nml_machine_learning_stats_t &mls);
+void ml_update_dimensions_chart(ml_host_t *host, const ml_machine_learning_stats_t &mls);
-void nml_update_host_and_detection_rate_charts(nml_host_t *host, collected_number anomaly_rate);
+void ml_update_host_and_detection_rate_charts(ml_host_t *host, collected_number anomaly_rate);
-void nml_update_training_statistics_chart(nml_host_t *host, const nml_training_stats_t &ts);
+void ml_update_training_statistics_chart(ml_host_t *host, const ml_training_stats_t &ts);
#endif /* ML_ADCHARTS_H */
diff --git a/ml/json b/ml/json
deleted file mode 160000
-Subproject 0b345b20c888f7dc8888485768e4bf9a6be29de
diff --git a/ml/ml-dummy.c b/ml/ml-dummy.c
index dfbadcd5a3..53444e246f 100644
--- a/ml/ml-dummy.c
+++ b/ml/ml-dummy.c
@@ -8,78 +8,81 @@ bool ml_capable() {
return false;
}
-bool ml_enabled(RRDHOST *RH) {
- (void) RH;
+bool ml_enabled(RRDHOST *rh) {
+ UNUSED(rh);
+ return false;
+}
+
+bool ml_streaming_enabled() {
return false;
}
void ml_init(void) {}
-void ml_host_new(RRDHOST *RH) {
- UNUSED(RH);
+void ml_host_new(RRDHOST *rh) {
+ UNUSED(rh);
}
-void ml_host_delete(RRDHOST *RH) {
- UNUSED(RH);
+void ml_host_delete(RRDHOST *rh) {
+ UNUSED(rh);
}
-void ml_chart_new(RRDSET *RS) {
- UNUSED(RS);
+void ml_host_start_training_thread(RRDHOST *rh) {
+ UNUSED(rh);
}
-void ml_chart_delete(RRDSET *RS) {
- UNUSED(RS);
+void ml_host_stop_training_thread(RRDHOST *rh) {
+ UNUSED(rh);
}
-void ml_dimension_new(RRDDIM *RD) {
- UNUSED(RD);
+void ml_host_cancel_training_thread(RRDHOST *rh) {
+ UNUSED(rh);
}
-void ml_dimension_delete(RRDDIM *RD) {
- UNUSED(RD);
+void ml_host_get_info(RRDHOST *rh, BUFFER *wb) {
+ UNUSED(rh);
+ UNUSED(wb);
}
-void ml_start_training_thread(RRDHOST *RH) {
- UNUSED(RH);
+void ml_host_get_models(RRDHOST *rh, BUFFER *wb) {
+ UNUSED(rh);
+ UNUSED(wb);
}
-void ml_stop_training_thread(RRDHOST *RH) {
- UNUSED(RH);
+void ml_host_get_runtime_info(RRDHOST *rh) {
+ UNUSED(rh);
}
-void ml_get_host_info(RRDHOST *RH, BUFFER *wb) {
- (void) RH;
- (void) wb;
+void ml_chart_new(RRDSET *rs) {
+ UNUSED(rs);
}
-char *ml_get_host_runtime_info(RRDHOST *RH) {
- (void) RH;
- return NULL;
+void ml_chart_delete(RRDSET *rs) {
+ UNUSED(rs);
}
-bool ml_chart_update_begin(RRDSET *RS) {
- (void) RS;
+bool ml_chart_update_begin(RRDSET *rs) {
+ UNUSED(rs);
return false;
}
-void ml_chart_update_end(RRDSET *RS) {
- (void) RS;
+void ml_chart_update_end(RRDSET *rs) {
+ UNUSED(rs);
}
-char *ml_get_host_models(RRDHOST *RH) {
- (void) RH;
- return NULL;
+void ml_dimension_new(RRDDIM *rd) {
+ UNUSED(rd);
}
-bool ml_is_anomalous(RRDDIM *RD, time_t CurrT, double Value, bool Exists) {
- (void) RD;
- (void) CurrT;
- (void) Value;
- (void) Exists;
- return false;
+void ml_dimension_delete(RRDDIM *rd) {
+ UNUSED(rd);
}
-bool ml_streaming_enabled() {
+bool ml_dimension_is_anomalous(RRDDIM *rd, time_t curr_time, double value, bool exists) {
+ UNUSED(rd);
+ UNUSED(curr_time);
+ UNUSED(value);
+ UNUSED(exists);
return false;
}
diff --git a/ml/nml.h b/ml/ml-private.h
index e7363087c5..173b82e265 100644
--- a/ml/nml.h
+++ b/ml/ml-private.h
@@ -1,7 +1,7 @@
// SPDX-License-Identifier: GPL-3.0-or-later
-#ifndef NETDATA_NML_H
-#define NETDATA_NML_H
+#ifndef NETDATA_ML_PRIVATE_H
+#define NETDATA_ML_PRIVATE_H
#include "dlib/matrix.h"
#include "ml/ml.h"
@@ -28,7 +28,7 @@ typedef struct {
size_t src_n;
std::vector<DSample> &preprocessed_features;
-} nml_features_t;
+} ml_features_t;
/*
* KMeans
@@ -41,9 +41,7 @@ typedef struct {
calculated_number_t min_dist;
calculated_number_t max_dist;
-} nml_kmeans_t;
-
-#include "json/single_include/nlohmann/json.hpp"
+} ml_kmeans_t;
typedef struct machine_learning_stats_t {
size_t num_machine_learning_status_enabled;
@@ -59,11 +57,9 @@ typedef struct machine_learning_stats_t {
size_t num_anomalous_dimensions;
size_t num_normal_dimensions;
-} nml_machine_learning_stats_t;
+} ml_machine_learning_stats_t;
typedef struct training_stats_t {
- struct rusage training_ru;
-
size_t queue_size;
size_t num_popped_items;
@@ -76,9 +72,9 @@ typedef struct training_stats_t {
size_t training_result_not_enough_collected_values;
size_t training_result_null_acquired_dimension;
size_t training_result_chart_under_replication;
-} nml_training_stats_t;
+} ml_training_stats_t;
-enum nml_metric_type {
+enum ml_metric_type {
// The dimension has constant values, no need to train
METRIC_TYPE_CONSTANT,
@@ -86,7 +82,7 @@ enum nml_metric_type {
METRIC_TYPE_VARIABLE,
};
-enum nml_machine_learning_status {
+enum ml_machine_learning_status {
// Enable training/prediction
MACHINE_LEARNING_STATUS_ENABLED,
@@ -94,7 +90,7 @@ enum nml_machine_learning_status {
MACHINE_LEARNING_STATUS_DISABLED_DUE_TO_EXCLUDED_CHART,
};
-enum nml_training_status {
+enum ml_training_status {
// We don't have a model for this dimension
TRAINING_STATUS_UNTRAINED,
@@ -108,7 +104,7 @@ enum nml_training_status {
TRAINING_STATUS_TRAINED,
};
-enum nml_training_result {
+enum ml_training_result {
// We managed to create a KMeans model
TRAINING_RESULT_OK,
@@ -137,7 +133,7 @@ typedef struct {
// at the point the request was made
time_t first_entry_on_request;
time_t last_entry_on_request;
-} nml_training_request_t;
+} ml_training_request_t;
typedef struct {
// Time when the request for this response was made
@@ -166,71 +162,52 @@ typedef struct {
size_t total_values;
// Result of training response
- enum nml_training_result result;
-} nml_training_response_t;
+ enum ml_training_result result;
+} ml_training_response_t;
/*
* Queue
*/
-
typedef struct {
- std::queue<nml_training_request_t> internal;
+ std::queue<ml_training_request_t> internal;
netdata_mutex_t mutex;
pthread_cond_t cond_var;
std::atomic<bool> exit;
-} nml_queue_t;
+} ml_queue_t;
-nml_queue_t *nml_queue_init(void);
-void nml_queue_destroy(nml_queue_t *q);
-
-void nml_queue_push(nml_queue_t *q, const nml_training_request_t req);
-nml_training_request_t nml_queue_pop(nml_queue_t *q);
-size_t nml_queue_size(nml_queue_t *q);
-
-void nml_queue_signal(nml_queue_t *q);
typedef struct {
RRDDIM *rd;
- enum nml_metric_type mt;
- enum nml_training_status ts;
- enum nml_machine_learning_status mls;
+ enum ml_metric_type mt;
+ enum ml_training_status ts;
+ enum ml_machine_learning_status mls;
- nml_training_response_t tr;
+ ml_training_response_t tr;
time_t last_training_time;
std::vector<calculated_number_t> cns;
- std::vector<nml_kmeans_t> km_contexts;
+ std::vector<ml_kmeans_t> km_contexts;
netdata_mutex_t mutex;
- nml_kmeans_t kmeans;
+ ml_kmeans_t kmeans;
std::vector<DSample> feature;
-} nml_dimension_t;
-
-nml_dimension_t *nml_dimension_new(RRDDIM *rd);
-void nml_dimension_delete(nml_dimension_t *dim);
-
-bool nml_dimension_predict(nml_dimension_t *d, time_t curr_t, calculated_number_t value, bool exists);
+} ml_dimension_t;
typedef struct {
RRDSET *rs;
- nml_machine_learning_stats_t mls;
+ ml_machine_learning_stats_t mls;
netdata_mutex_t mutex;
-} nml_chart_t;
-
-nml_chart_t *nml_chart_new(RRDSET *rs);
-void nml_chart_delete(nml_chart_t *chart);
+} ml_chart_t;
-void nml_chart_update_begin(nml_chart_t *chart);
-void nml_chart_update_end(nml_chart_t *chart);
-void nml_chart_update_dimension(nml_chart_t *chart, nml_dimension_t *dim, bool is_anomalous);
+void ml_chart_update_dimension(ml_chart_t *chart, ml_dimension_t *dim, bool is_anomalous);
typedef struct {
RRDHOST *rh;
- nml_machine_learning_stats_t mls;
- nml_training_stats_t ts;
+ ml_machine_learning_stats_t mls;
+ ml_training_stats_t ts;
calculated_number_t host_anomaly_rate;
@@ -238,7 +215,7 @@ typedef struct {
std::atomic<bool> threads_cancelled;
std::atomic<bool> threads_joined;
- nml_queue_t *training_queue;
+ ml_queue_t *training_queue;
netdata_mutex_t mutex;
@@ -288,17 +265,7 @@ typedef struct {
RRDDIM *training_results_not_enough_collected_values_rd;
RRDDIM *training_results_null_acquired_dimension_rd;
RRDDIM *training_results_chart_under_replication_rd;
-} nml_host_t;
-
-nml_host_t *nml_host_new(RRDHOST *rh);
-void nml_host_delete(nml_host_t *host);
-
-void nml_host_start_training_thread(nml_host_t *host);
-void nml_host_stop_training_thread(nml_host_t *host, bool join);
-
-void nml_host_get_config_as_json(nml_host_t *host, BUFFER *wb);
-void nml_host_get_models_as_json(nml_host_t *host, nlohmann::json &j);
-void nml_host_get_detection_info_as_json(nml_host_t *host, nlohmann::json &j);
+} ml_host_t;
typedef struct {
bool enable_anomaly_detection;
@@ -335,12 +302,10 @@ typedef struct {
std::vector<uint32_t> random_nums;
netdata_thread_t detection_thread;
-} nml_config_t;
-
-void nml_config_load(nml_config_t *cfg);
+} ml_config_t;
-void *nml_detect_main(void *arg);
+void ml_config_load(ml_config_t *cfg);
-extern nml_config_t Cfg;
+extern ml_config_t Cfg;
-#endif /* NETDATA_NML_H */
+#endif /* NETDATA_ML_PRIVATE_H */
diff --git a/ml/ml.cc b/ml/ml.cc
index 43b49aa5de..cf9ea379a6 100644
--- a/ml/ml.cc
+++ b/ml/ml.cc
@@ -1,14 +1,969 @@
// SPDX-License-Identifier: GPL-3.0-or-later
-#include "nml.h"
+#include <dlib/clustering.h>
+
+#include "ml-private.h"
#include <random>
-bool ml_capable() {
+#include "ad_charts.h"
+
+typedef struct {
+ calculated_number_t *training_cns;
+ calculated_number_t *scratch_training_cns;
+
+ std::vector<DSample> training_samples;
+} ml_tls_data_t;
+
+static thread_local ml_tls_data_t tls_data;
+
+/*
+ * Functions to convert enums to strings
+*/
+
+__attribute__((unused)) static const char *
+ml_machine_learning_status_to_string(enum ml_machine_learning_status mls)
+{
+ switch (mls) {
+ case MACHINE_LEARNING_STATUS_ENABLED:
+ return "enabled";
+ case MACHINE_LEARNING_STATUS_DISABLED_DUE_TO_EXCLUDED_CHART:
+ return "disabled-sp";
+ default:
+ return "unknown";
+ }
+}
+
+__attribute__((unused)) static const char *
+ml_metric_type_to_string(enum ml_metric_type mt)
+{
+ switch (mt) {
+ case METRIC_TYPE_CONSTANT:
+ return "constant";
+ case METRIC_TYPE_VARIABLE:
+ return "variable";
+ default:
+ return "unknown";
+ }
+}
+
+__attribute__((unused)) static const char *
+ml_training_status_to_string(enum ml_training_status ts)
+{
+ switch (ts) {
+ case TRAINING_STATUS_PENDING_WITH_MODEL:
+ return "pending-with-model";
+ case TRAINING_STATUS_PENDING_WITHOUT_MODEL:
+ return "pending-without-model";
+ case TRAINING_STATUS_TRAINED:
+ return "trained";
+ case TRAINING_STATUS_UNTRAINED:
+ return "untrained";
+ default:
+ return "unknown";
+ }
+}
+
+__attribute__((unused)) static const char *
+ml_training_result_to_string(enum ml_training_result tr)
+{
+ switch (tr) {
+ case TRAINING_RESULT_OK:
+ return "ok";
+ case TRAINING_RESULT_INVALID_QUERY_TIME_RANGE:
+ return "invalid-query";
+ case TRAINING_RESULT_NOT_ENOUGH_COLLECTED_VALUES:
+ return "missing-values";
+ case TRAINING_RESULT_NULL_ACQUIRED_DIMENSION:
+ return "null-acquired-dim";
+ case TRAINING_RESULT_CHART_UNDER_REPLICATION:
+ return "chart-under-replication";
+ default:
+ return "unknown";
+ }
+}
+
+/*
+ * Features
+*/
+
+// subtract elements that are `diff_n` positions apart
+static void
+ml_features_diff(ml_features_t *features)
+{
+ if (features->diff_n == 0)
+ return;
+
+ for (size_t idx = 0; idx != (features->src_n - features->diff_n); idx++) {
+ size_t high = (features->src_n - 1) - idx;
+ size_t low = high - features->diff_n;
+
+ features->dst[low] = features->src[high] - features->src[low];
+ }
+
+ size_t n = features->src_n - features->diff_n;
+ memcpy(features->src, features->dst, n * sizeof(calculated_number_t));
+
+ for (size_t idx = features->src_n - features->diff_n; idx != features->src_n; idx++)
+ features->src[idx] = 0.0;
+}
+
+// a function that computes the window average of an array inplace
+static void
+ml_features_smooth(ml_features_t *features)
+{
+ calculated_number_t sum = 0.0;
+
+ size_t idx = 0;
+ for (; idx != features->smooth_n - 1; idx++)
+ sum += features->src[idx];
+
+ for (; idx != (features->src_n - features->diff_n); idx++) {
+ sum += features->src[idx];
+ calculated_number_t prev_cn = features->src[idx - (features->smooth_n - 1)];
+ features->src[idx - (features->smooth_n - 1)] = sum / features->smooth_n;
+ sum -= prev_cn;
+ }
+
+ for (idx = 0; idx != features->smooth_n; idx++)
+ features->src[(features->src_n - 1) - idx] = 0.0;
+}
+
+// create lag'd vectors out of the preprocessed buffer
+static void
+ml_features_lag(ml_features_t *features)
+{
+ size_t n = features->src_n - features->diff_n - features->smooth_n + 1 - features->lag_n;
+ features->preprocessed_features.resize(n);
+
+ unsigned target_num_samples = Cfg.max_train_samples * Cfg.random_sampling_ratio;
+ double sampling_ratio = std::min(static_cast<double>(target_num_samples) / n, 1.0);
+
+ uint32_t max_mt = std::numeric_limits<uint32_t>::max();
+ uint32_t cutoff = static_cast<double>(max_mt) * sampling_ratio;
+
+ size_t sample_idx = 0;
+
+ for (size_t idx = 0; idx != n; idx++) {
+ DSample &DS = features->preprocessed_features[sample_idx++];
+ DS.set_size(features->lag_n);
+
+ if (Cfg.random_nums[idx] > cutoff) {
+ sample_idx--;
+ continue;
+ }
+
+ for (size_t feature_idx = 0; feature_idx != features->lag_n + 1; feature_idx++)
+ DS(feature_idx) = features->src[idx + feature_idx];
+ }
+
+ features->preprocessed_features.resize(sample_idx);
+}
+
+static void
+ml_features_preprocess(ml_features_t *features)
+{
+ ml_features_diff(features);
+ ml_features_smooth(features);
+ ml_features_lag(features);
+}
+
+/*
+ * KMeans
+*/
+
+static void
+ml_kmeans_init(ml_kmeans_t *kmeans, size_t num_clusters, size_t max_iterations)
+{
+ kmeans->num_clusters = num_clusters;
+ kmeans->max_iterations = max_iterations;
+
+ kmeans->cluster_centers.reserve(kmeans->num_clusters);
+ kmeans->min_dist = std::numeric_limits<calculated_number_t>::max();
+ kmeans->max_dist = std::numeric_limits<calculated_number_t>::min();
+}
+
+static void
+ml_kmeans_train(ml_kmeans_t *kmeans, const ml_features_t *features)
+{
+ kmeans->min_dist = std::numeric_limits<calculated_number_t>::max();
+ kmeans->max_dist = std::numeric_limits<calculated_number_t>::min();
+
+ kmeans->cluster_centers.clear();
+
+ dlib::pick_initial_centers(kmeans->num_clusters, kmeans->cluster_centers, features->preprocessed_features);
+ dlib::find_clusters_using_kmeans(features->preprocessed_features, kmeans->cluster_centers, kmeans->max_iterations);
+
+ for (const auto &preprocessed_feature : features->preprocessed_features) {
+ calculated_number_t mean_dist = 0.0;
+
+ for (const auto &cluster_center : kmeans->cluster_centers) {
+ mean_dist += dlib::length(cluster_center - preprocessed_feature);
+ }
+
+ mean_dist /= kmeans->num_clusters;
+
+ if (mean_dist < kmeans->min_dist)
+ kmeans->min_dist = mean_dist;
+
+ if (mean_dist > kmeans->max_dist)
+ kmeans->max_dist = mean_dist;
+ }
+}
+
+static calculated_number_t
+ml_kmeans_anomaly_score(const ml_kmeans_t *kmeans, const DSample &DS)
+{
+ calculated_number_t mean_dist = 0.0;
+ for (const auto &CC: kmeans->cluster_centers)
+ mean_dist += dlib::length(CC - DS);
+
+ mean_dist /= kmeans->num_clusters;
+
+ if (kmeans->max_dist == kmeans->min_dist)
+ return 0.0;
+
+ calculated_number_t anomaly_score = 100.0 * std::abs((mean_dist - kmeans->min_dist) / (kmeans->max_dist - kmeans->min_dist));
+ return (anomaly_score > 100.0) ? 100.0 : anomaly_score;
+}
+
+/*
+ * Queue
+*/
+
+static ml_queue_t *
+ml_queue_init()
+{
+ ml_queue_t *q = new ml_queue_t();
+
+ netdata_mutex_init(&q->mutex);
+ pthread_cond_init(&q->cond_var, NULL);
+ q->exit = false;
+ return q;
+}
+
+static void
+ml_queue_destroy(ml_queue_t *q)
+{
+ netdata_mutex_destroy(&q->mutex);
+ pthread_cond_destroy(&q->cond_var);
+ delete q;
+}
+
+static void
+ml_queue_push(ml_queue_t *q, const ml_training_request_t req)
+{
+ netdata_mutex_lock(&q->mutex);
+ q->internal.push(req);
+ pthread_cond_signal(&q->cond_var);
+ netdata_mutex_unlock(&q->mutex);
+}
+
+static ml_training_request_t
+ml_queue_pop(ml_queue_t *q)
+{
+ netdata_mutex_lock(&q->mutex);
+
+ ml_training_request_t req = { NULL, NULL, 0, 0, 0 };
+
+ while (q->internal.empty()) {
+ pthread_cond_wait(&q->cond_var, &q->mutex);
+
+ if (q->exit) {
+ netdata_mutex_unlock(&q->mutex);
+
+ // We return a dummy request because the queue has been signaled
+ return req;
+ }
+ }
+
+ req = q->internal.front();
+ q->internal.pop();
+
+ netdata_mutex_unlock(&q->mutex);
+ return req;
+}
+
+static size_t
+ml_queue_size(ml_queue_t *q)
+{
+ netdata_mutex_lock(&q->mutex);
+ size_t size = q->internal.size();
+ netdata_mutex_unlock(&q->mutex);
+ return size;
+}
+
+static void
+ml_queue_signal(ml_queue_t *q)
+{
+ netdata_mutex_lock(&q->mutex);
+ q->exit = true;
+ pthread_cond_signal(&q->cond_var);
+ netdata_mutex_unlock(&q->mutex);
+}
+
+/*
+ * Dimension
+*/
+
+static std::pair<calculated_number_t *, ml_training_response_t>
+ml_dimension_calculated_numbers(ml_dimension_t *dim, const ml_training_request_t &training_request)
+{
+ ml_training_response_t training_response = {};
+
+ training_response.request_time = training_request.request_time;
+ training_response.first_entry_on_request = training_request.first_entry_on_request;
+ training_response.last_entry_on_request = training_request.last_entry_on_request;
+
+ training_response.first_entry_on_response = rrddim_first_entry_s_of_tier(dim->rd, 0);
+ training_response.last_entry_on_response = rrddim_last_entry_s_of_tier(dim->rd, 0);
+
+ size_t min_n = Cfg.min_train_samples;
+ size_t max_n = Cfg.max_train_samples;
+
+ // Figure out what our time window should be.
+ training_response.query_before_t = training_response.last_entry_on_response;
+ training_response.query_after_t = std::max(
+ training_response.query_before_t - static_cast<time_t>((max_n - 1) * dim->rd->update_every),
+ training_response.first_entry_on_response
+ );
+
+ if (training_response.query_after_t >= training_response.query_before_t) {
+ training_response.result = TRAINING_RESULT_INVALID_QUERY_TIME_RANGE;
+ return { NULL, training_response };
+ }
+
+ if (rrdset_is_replicating(dim->rd->rrdset)) {
+ training_response.result = TRAINING_RESULT_CHART_UNDER_REPLICATION;
+ return { NULL, training_response };
+ }
+
+ /*
+ * Execute the query
+ */
+ struct storage_engine_query_ops *ops = dim->rd->tiers[0].query_ops;
+ struct storage_engine_query_handle handle;
+
+ ops->init(dim->rd->tiers[0].db_metric_handle,
+ &handle,
+ training_response.query_after_t,
+ training_response.query_before_t,
+ STORAGE_PRIORITY_BEST_EFFORT);
+
+ size_t idx = 0;
+ memset(tls_data.training_cns, 0, sizeof(calculated_number_t) * max_n * (Cfg.lag_n + 1));
+ calculated_number_t last_value = std::numeric_limits<calculated_number_t>::quiet_NaN();
+
+ while (!ops->is_finished(&handle)) {
+ if (idx == max_n)
+ break;
+
+ STORAGE_POINT sp = ops->next_metric(&handle);
+
+ time_t timestamp = sp.end_time_s;
+ calculated_number_t value = sp.sum / sp.count;
+
+ if (netdata_double_isnumber(value)) {
+ if (!training_response.db_after_t)
+ training_response.db_after_t = timestamp;
+ training_response.db_before_t = timestamp;
+
+ tls_data.training_cns[idx] = value;
+ last_value = tls_data.training_cns[idx];
+ training_response.collected_values++;
+ } else
+ tls_data.training_cns[idx] = last_value;
+
+ idx++;
+ }
+ ops->finalize(&handle);
+
+ global_statistics_ml_query_completed(/* points_read */ idx);
+
+ training_response.total_values = idx;
+ if (training_response.collected_values < min_n) {
+ training_response.result = TRAINING_RESULT_NOT_ENOUGH_COLLECTED_VALUES;
+ return { NULL, training_response };
+ }
+
+ // Find first non-NaN value.
+ for (idx = 0; std::isnan(tls_data.training_cns[idx]); idx++, training_response.total_values--) { }
+
+ // Overwrite NaN values.
+ if (idx != 0)
+ memmove(tls_data.training_cns, &tls_data.training_cns[idx], sizeof(calculated_number_t) * training_response.total_values);
+
+ training_response.result = TRAINING_RESULT_OK;
+ return { tls_data.training_cns, training_response };
+}
+
+static enum ml_training_result
+ml_dimension_train_model(ml_dimension_t *dim, const ml_training_request_t &training_request)
+{
+ auto P = ml_dimension_calculated_numbers(dim, training_request);
+ ml_training_response_t training_response = P.second;
+
+ if (training_response.result != TRAINING_RESULT_OK) {
+ netdata_mutex_lock(&dim->mutex);
+
+ dim->mt = METRIC_TYPE_CONSTANT;
+
+ switch (dim->ts) {
+ case TRAINING_STATUS_PENDING_WITH_MODEL:
+ dim->ts = TRAINING_STATUS_TRAINED;
+ break;
+ case TRAINING_STATUS_PENDING_WITHOUT_MODEL:
+ dim->ts = TRAINING_STATUS_UNTRAINED;
+ break;
+ default:
+ break;
+ }
+
+ dim->tr = training_response;
+
+ dim->last_training_time = training_response.last_entry_on_response;
+ enum ml_training_result result = training_response.result;
+ netdata_mutex_unlock(&dim->mutex);
+
+ return result;
+ }
+
+ // compute kmeans
+ {
+ memcpy(tls_data.scratch_training_cns, tls_data.training_cns,
+ training_response.total_values * sizeof(calculated_number_t));
+
+ ml_features_t features = {
+ Cfg.diff_n, Cfg.smooth_n, Cfg.lag_n,
+ tls_data.scratch_training_cns, training_response.total_values,
+ tls_data.training_cns, training_response.total_values,
+ tls_data.training_samples
+ };
+ ml_features_preprocess(&features);
+
+ ml_kmeans_init(&dim->kmeans, 2, 1000);
+ ml_kmeans_train(&dim->kmeans, &features);
+ }
+
+ // update kmeans models
+ {
+ netdata_mutex_lock(&dim->mutex);
+
+ if (dim->km_contexts.size() < Cfg.num_models_to_use) {
+ dim->km_contexts.push_back(std::move(dim->kmeans));
+ } else {
+ std::rotate(std::begin(dim->km_contexts), std::begin(dim->km_contexts) + 1, std::end(dim->km_contexts));
+ dim->km_contexts[dim->km_contexts.size() - 1] = std::move(dim->kmeans);
+ }
+
+ dim->mt = METRIC_TYPE_CONSTANT;
+ dim->ts = TRAINING_STATUS_TRAINED;
+ dim->tr = training_response;
+ dim->last_training_time = rrddim_last_entry_s(dim->rd);
+
+ netdata_mutex_unlock(&dim->mutex);
+ }
+
+ return training_response.result;
+}
+
+static void
+ml_dimension_schedule_for_training(ml_dimension_t *dim, time_t curr_time)
+{
+ switch (dim->mt) {
+ case METRIC_TYPE_CONSTANT:
+ return;
+ default:
+ break;
+ }
+
+ bool schedule_for_training = false;
+
+ switch (dim->ts) {
+ case TRAINING_STATUS_PENDING_WITH_MODEL:
+ case TRAINING_STATUS_PENDING_WITHOUT_MODEL:
+ schedule_for_training = false;
+ break;
+ case TRAINING_STATUS_UNTRAINED:
+ schedule_for_training = true;
+ dim->ts = TRAINING_STATUS_PENDING_WITHOUT_MODEL;
+ break;
+ case TRAINING_STATUS_TRAINED:
+ if ((dim->last_training_time + (Cfg.train_every * dim->rd->update_every)) < curr_time) {
+ schedule_for_training = true;
+ dim->ts = TRAINING_STATUS_PENDING_WITH_MODEL;
+ }
+ break;
+ }
+
+ if (schedule_for_training) {
+ ml_host_t *host = (ml_host_t *) dim->rd->rrdset->rrdhost->ml_host;
+ ml_training_request_t req = {
+ string_dup(dim->rd->rrdset->id), string_dup(dim->rd->id),
+ curr_time, rrddim_first_entry_s(dim->rd), rrddim_last_entry_s(dim->rd),
+ };
+ ml_queue_push(host->training_queue, req);
+ }
+}
+
+static bool
+ml_dimension_predict(ml_dimension_t *dim, time_t curr_time, calculated_number_t value, bool exists)
+{
+ // Nothing to do if ML is disabled for this dimension
+ if (dim->mls != MACHINE_LEARNING_STATUS_ENABLED)
+ return false;
+
+ // Don't treat values that don't exist as anomalous
+ if (!exists) {
+ dim->cns.clear();
+ return false;
+ }
+
+ // Save the value and return if we don't have enough values for a sample
+ unsigned n = Cfg.diff_n + Cfg.smooth_n + Cfg.lag_n;
+ if (dim->cns.size() < n) {
+ dim->cns.push_back(value);
+ return false;
+ }
+
+ // Push the value and check if it's different from the last one
+ bool same_value = true;
+ std::rotate(std::begin(dim->cns), std::begin(dim->cns) + 1, std::end(dim->cns));
+ if (dim-&