From 45981cb7347a5fed7ebbabba0fe193d0d9471eff Mon Sep 17 00:00:00 2001 From: vkalintiris Date: Tue, 28 Feb 2023 15:53:45 +0200 Subject: Port ML from C++ to C. (#14567) * Port ML from C++ to C. Pretty much everything is a non-functional change, ie. the functionality is identical to the one provided by the existing implementation that is written in C++. Performance-wise, this implementation: - Eliminates/reduces the number of allocations and deallocations we have to do for training/detection, - Uses just a single thread to perform detection for *all* the hosts (ie. reduces the number of required threads by 50% on parents), and - Allows training, prediction and detection of dimensions that have an update_every that is different from that of the localhost. The only C++ functionality that we still use is vectors, because they make our life easier and they are pretty much a requirement imposed by dlib. * Remove profile.plugin It was useful only for testing during development. * Limit logs to 200 lines per period * Properly generate ml_info in /api/v1/info endpoint. * Remove resource usage charts since we use worker charts. * Use a temporary to make linters happy. * Rebase. * Fix builds that have ML functionality disabled. --- ml/ADCharts.cc | 518 ----------------------- ml/ADCharts.h | 21 - ml/Chart.cc | 0 ml/Chart.h | 128 ------ ml/Config.cc | 113 +++-- ml/Config.h | 52 --- ml/Dimension.cc | 346 ---------------- ml/Dimension.h | 198 --------- ml/Host.cc | 387 ------------------ ml/Host.h | 70 ---- ml/KMeans.cc | 43 -- ml/KMeans.h | 41 -- ml/Mutex.h | 36 -- ml/Query.h | 57 --- ml/Queue.h | 66 --- ml/SamplesBuffer.cc | 183 --------- ml/SamplesBuffer.h | 149 ------- ml/Stats.h | 46 --- ml/ad_charts.cc | 446 ++++++++++++++++++++ ml/ad_charts.h | 14 + ml/ml-dummy.c | 4 +- ml/ml-private.h | 13 - ml/ml.cc | 209 +++++----- ml/ml.h | 30 +- ml/nml.cc | 1135 +++++++++++++++++++++++++++++++++++++++++++++++++++ ml/nml.h | 346 ++++++++++++++++ 26 files changed, 2114 insertions(+), 2537 deletions(-) delete mode 100644 ml/ADCharts.cc delete mode 100644 ml/ADCharts.h delete mode 100644 ml/Chart.cc delete mode 100644 ml/Chart.h delete mode 100644 ml/Config.h delete mode 100644 ml/Dimension.cc delete mode 100644 ml/Dimension.h delete mode 100644 ml/Host.cc delete mode 100644 ml/Host.h delete mode 100644 ml/KMeans.cc delete mode 100644 ml/KMeans.h delete mode 100644 ml/Mutex.h delete mode 100644 ml/Query.h delete mode 100644 ml/Queue.h delete mode 100644 ml/SamplesBuffer.cc delete mode 100644 ml/SamplesBuffer.h delete mode 100644 ml/Stats.h create mode 100644 ml/ad_charts.cc create mode 100644 ml/ad_charts.h delete mode 100644 ml/ml-private.h create mode 100644 ml/nml.cc create mode 100644 ml/nml.h (limited to 'ml') diff --git a/ml/ADCharts.cc b/ml/ADCharts.cc deleted file mode 100644 index cbb13f5d19..0000000000 --- a/ml/ADCharts.cc +++ /dev/null @@ -1,518 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#include "ADCharts.h" -#include "Config.h" - -void ml::updateDimensionsChart(RRDHOST *RH, const MachineLearningStats &MLS) { - /* - * Machine learning status - */ - { - static thread_local RRDSET *MachineLearningStatusRS = nullptr; - - static thread_local RRDDIM *Enabled = nullptr; - static thread_local RRDDIM *DisabledUE = nullptr; - static thread_local RRDDIM *DisabledSP = nullptr; - - if (!MachineLearningStatusRS) { - std::stringstream IdSS, NameSS; - - IdSS << "machine_learning_status_on_" << localhost->machine_guid; - NameSS << "machine_learning_status_on_" << rrdhost_hostname(localhost); - - MachineLearningStatusRS = rrdset_create( - RH, - "netdata", // type - IdSS.str().c_str(), // id - NameSS.str().c_str(), // name - NETDATA_ML_CHART_FAMILY, // family - "netdata.machine_learning_status", // ctx - "Machine learning status", // title - "dimensions", // units - NETDATA_ML_PLUGIN, // plugin - NETDATA_ML_MODULE_TRAINING, // module - NETDATA_ML_CHART_PRIO_MACHINE_LEARNING_STATUS, // priority - RH->rrd_update_every, // update_every - RRDSET_TYPE_LINE // chart_type - ); - rrdset_flag_set(MachineLearningStatusRS , RRDSET_FLAG_ANOMALY_DETECTION); - - Enabled = rrddim_add(MachineLearningStatusRS, "enabled", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - DisabledUE = rrddim_add(MachineLearningStatusRS, "disabled-ue", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - DisabledSP = rrddim_add(MachineLearningStatusRS, "disabled-sp", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - } - - rrddim_set_by_pointer(MachineLearningStatusRS, Enabled, MLS.NumMachineLearningStatusEnabled); - rrddim_set_by_pointer(MachineLearningStatusRS, DisabledUE, MLS.NumMachineLearningStatusDisabledUE); - rrddim_set_by_pointer(MachineLearningStatusRS, DisabledSP, MLS.NumMachineLearningStatusDisabledSP); - - rrdset_done(MachineLearningStatusRS); - } - - /* - * Metric type - */ - { - static thread_local RRDSET *MetricTypesRS = nullptr; - - static thread_local RRDDIM *Constant = nullptr; - static thread_local RRDDIM *Variable = nullptr; - - if (!MetricTypesRS) { - std::stringstream IdSS, NameSS; - - IdSS << "metric_types_on_" << localhost->machine_guid; - NameSS << "metric_types_on_" << rrdhost_hostname(localhost); - - MetricTypesRS = rrdset_create( - RH, - "netdata", // type - IdSS.str().c_str(), // id - NameSS.str().c_str(), // name - NETDATA_ML_CHART_FAMILY, // family - "netdata.metric_types", // ctx - "Dimensions by metric type", // title - "dimensions", // units - NETDATA_ML_PLUGIN, // plugin - NETDATA_ML_MODULE_TRAINING, // module - NETDATA_ML_CHART_PRIO_METRIC_TYPES, // priority - RH->rrd_update_every, // update_every - RRDSET_TYPE_LINE // chart_type - ); - rrdset_flag_set(MetricTypesRS, RRDSET_FLAG_ANOMALY_DETECTION); - - Constant = rrddim_add(MetricTypesRS, "constant", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - Variable = rrddim_add(MetricTypesRS, "variable", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - } - - rrddim_set_by_pointer(MetricTypesRS, Constant, MLS.NumMetricTypeConstant); - rrddim_set_by_pointer(MetricTypesRS, Variable, MLS.NumMetricTypeVariable); - - rrdset_done(MetricTypesRS); - } - - /* - * Training status - */ - { - static thread_local RRDSET *TrainingStatusRS = nullptr; - - static thread_local RRDDIM *Untrained = nullptr; - static thread_local RRDDIM *PendingWithoutModel = nullptr; - static thread_local RRDDIM *Trained = nullptr; - static thread_local RRDDIM *PendingWithModel = nullptr; - - if (!TrainingStatusRS) { - std::stringstream IdSS, NameSS; - - IdSS << "training_status_on_" << localhost->machine_guid; - NameSS << "training_status_on_" << rrdhost_hostname(localhost); - - TrainingStatusRS = rrdset_create( - RH, - "netdata", // type - IdSS.str().c_str(), // id - NameSS.str().c_str(), // name - NETDATA_ML_CHART_FAMILY, // family - "netdata.training_status", // ctx - "Training status of dimensions", // title - "dimensions", // units - NETDATA_ML_PLUGIN, // plugin - NETDATA_ML_MODULE_TRAINING, // module - NETDATA_ML_CHART_PRIO_TRAINING_STATUS, // priority - RH->rrd_update_every, // update_every - RRDSET_TYPE_LINE // chart_type - ); - - rrdset_flag_set(TrainingStatusRS, RRDSET_FLAG_ANOMALY_DETECTION); - - Untrained = rrddim_add(TrainingStatusRS, "untrained", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - PendingWithoutModel = rrddim_add(TrainingStatusRS, "pending-without-model", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - Trained = rrddim_add(TrainingStatusRS, "trained", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - PendingWithModel = rrddim_add(TrainingStatusRS, "pending-with-model", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - } - - rrddim_set_by_pointer(TrainingStatusRS, Untrained, MLS.NumTrainingStatusUntrained); - rrddim_set_by_pointer(TrainingStatusRS, PendingWithoutModel, MLS.NumTrainingStatusPendingWithoutModel); - rrddim_set_by_pointer(TrainingStatusRS, Trained, MLS.NumTrainingStatusTrained); - rrddim_set_by_pointer(TrainingStatusRS, PendingWithModel, MLS.NumTrainingStatusPendingWithModel); - - rrdset_done(TrainingStatusRS); - } - - /* - * Prediction status - */ - { - static thread_local RRDSET *PredictionRS = nullptr; - - static thread_local RRDDIM *Anomalous = nullptr; - static thread_local RRDDIM *Normal = nullptr; - - if (!PredictionRS) { - std::stringstream IdSS, NameSS; - - IdSS << "dimensions_on_" << localhost->machine_guid; - NameSS << "dimensions_on_" << rrdhost_hostname(localhost); - - PredictionRS = rrdset_create( - RH, - "anomaly_detection", // type - IdSS.str().c_str(), // id - NameSS.str().c_str(), // name - "dimensions", // family - "anomaly_detection.dimensions", // ctx - "Anomaly detection dimensions", // title - "dimensions", // units - NETDATA_ML_PLUGIN, // plugin - NETDATA_ML_MODULE_TRAINING, // module - ML_CHART_PRIO_DIMENSIONS, // priority - RH->rrd_update_every, // update_every - RRDSET_TYPE_LINE // chart_type - ); - rrdset_flag_set(PredictionRS, RRDSET_FLAG_ANOMALY_DETECTION); - - Anomalous = rrddim_add(PredictionRS, "anomalous", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - Normal = rrddim_add(PredictionRS, "normal", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - } - - rrddim_set_by_pointer(PredictionRS, Anomalous, MLS.NumAnomalousDimensions); - rrddim_set_by_pointer(PredictionRS, Normal, MLS.NumNormalDimensions); - - rrdset_done(PredictionRS); - } - -} - -void ml::updateHostAndDetectionRateCharts(RRDHOST *RH, collected_number AnomalyRate) { - static thread_local RRDSET *HostRateRS = nullptr; - static thread_local RRDDIM *AnomalyRateRD = nullptr; - - if (!HostRateRS) { - std::stringstream IdSS, NameSS; - - IdSS << "anomaly_rate_on_" << localhost->machine_guid; - NameSS << "anomaly_rate_on_" << rrdhost_hostname(localhost); - - HostRateRS = rrdset_create( - RH, - "anomaly_detection", // type - IdSS.str().c_str(), // id - NameSS.str().c_str(), // name - "anomaly_rate", // family - "anomaly_detection.anomaly_rate", // ctx - "Percentage of anomalous dimensions", // title - "percentage", // units - NETDATA_ML_PLUGIN, // plugin - NETDATA_ML_MODULE_DETECTION, // module - ML_CHART_PRIO_ANOMALY_RATE, // priority - RH->rrd_update_every, // update_every - RRDSET_TYPE_LINE // chart_type - ); - rrdset_flag_set(HostRateRS, RRDSET_FLAG_ANOMALY_DETECTION); - - AnomalyRateRD = rrddim_add(HostRateRS, "anomaly_rate", NULL, - 1, 100, RRD_ALGORITHM_ABSOLUTE); - } - - rrddim_set_by_pointer(HostRateRS, AnomalyRateRD, AnomalyRate); - rrdset_done(HostRateRS); - - static thread_local RRDSET *AnomalyDetectionRS = nullptr; - static thread_local RRDDIM *AboveThresholdRD = nullptr; - static thread_local RRDDIM *NewAnomalyEventRD = nullptr; - - if (!AnomalyDetectionRS) { - std::stringstream IdSS, NameSS; - - IdSS << "anomaly_detection_on_" << localhost->machine_guid; - NameSS << "anomaly_detection_on_" << rrdhost_hostname(localhost); - - AnomalyDetectionRS = rrdset_create( - RH, - "anomaly_detection", // type - IdSS.str().c_str(), // id - NameSS.str().c_str(), // name - "anomaly_detection", // family - "anomaly_detection.detector_events", // ctx - "Anomaly detection events", // title - "percentage", // units - NETDATA_ML_PLUGIN, // plugin - NETDATA_ML_MODULE_DETECTION, // module - ML_CHART_PRIO_DETECTOR_EVENTS, // priority - RH->rrd_update_every, // update_every - RRDSET_TYPE_LINE // chart_type - ); - rrdset_flag_set(AnomalyDetectionRS, RRDSET_FLAG_ANOMALY_DETECTION); - - AboveThresholdRD = rrddim_add(AnomalyDetectionRS, "above_threshold", NULL, - 1, 1, RRD_ALGORITHM_ABSOLUTE); - NewAnomalyEventRD = rrddim_add(AnomalyDetectionRS, "new_anomaly_event", NULL, - 1, 1, RRD_ALGORITHM_ABSOLUTE); - } - - /* - * Compute the values of the dimensions based on the host rate chart - */ - ONEWAYALLOC *OWA = onewayalloc_create(0); - time_t Now = now_realtime_sec(); - time_t Before = Now - RH->rrd_update_every; - time_t After = Before - Cfg.AnomalyDetectionQueryDuration; - RRDR_OPTIONS Options = static_cast(0x00000000); - - RRDR *R = rrd2rrdr_legacy( - OWA, HostRateRS, - 1 /* points wanted */, - After, - Before, - Cfg.AnomalyDetectionGroupingMethod, - 0 /* resampling time */, - Options, "anomaly_rate", - NULL /* group options */, - 0, /* timeout */ - 0, /* tier */ - QUERY_SOURCE_ML, - STORAGE_PRIORITY_BEST_EFFORT - ); - - if(R) { - if(R->d == 1 && R->n == 1 && R->rows == 1) { - static thread_local bool PrevAboveThreshold = false; - bool AboveThreshold = R->v[0] >= Cfg.HostAnomalyRateThreshold; - bool NewAnomalyEvent = AboveThreshold && !PrevAboveThreshold; - PrevAboveThreshold = AboveThreshold; - - rrddim_set_by_pointer(AnomalyDetectionRS, AboveThresholdRD, AboveThreshold); - rrddim_set_by_pointer(AnomalyDetectionRS, NewAnomalyEventRD, NewAnomalyEvent); - rrdset_done(AnomalyDetectionRS); - } - - rrdr_free(OWA, R); - } - - onewayalloc_destroy(OWA); -} - -void ml::updateResourceUsageCharts(RRDHOST *RH, const struct rusage &PredictionRU, const struct rusage &TrainingRU) { - /* - * prediction rusage - */ - { - static thread_local RRDSET *RS = nullptr; - - static thread_local RRDDIM *User = nullptr; - static thread_local RRDDIM *System = nullptr; - - if (!RS) { - std::stringstream IdSS, NameSS; - - IdSS << "prediction_usage_for_" << RH->machine_guid; - NameSS << "prediction_usage_for_" << rrdhost_hostname(RH); - - RS = rrdset_create_localhost( - "netdata", // type - IdSS.str().c_str(), // id - NameSS.str().c_str(), // name - NETDATA_ML_CHART_FAMILY, // family - "netdata.prediction_usage", // ctx - "Prediction resource usage", // title - "milliseconds/s", // units - NETDATA_ML_PLUGIN, // plugin - NETDATA_ML_MODULE_PREDICTION, // module - NETDATA_ML_CHART_PRIO_PREDICTION_USAGE, // priority - RH->rrd_update_every, // update_every - RRDSET_TYPE_STACKED // chart_type - ); - rrdset_flag_set(RS, RRDSET_FLAG_ANOMALY_DETECTION); - - User = rrddim_add(RS, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); - System = rrddim_add(RS, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); - } - - rrddim_set_by_pointer(RS, User, PredictionRU.ru_utime.tv_sec * 1000000ULL + PredictionRU.ru_utime.tv_usec); - rrddim_set_by_pointer(RS, System, PredictionRU.ru_stime.tv_sec * 1000000ULL + PredictionRU.ru_stime.tv_usec); - - rrdset_done(RS); - } - - /* - * training rusage - */ - { - static thread_local RRDSET *RS = nullptr; - - static thread_local RRDDIM *User = nullptr; - static thread_local RRDDIM *System = nullptr; - - if (!RS) { - std::stringstream IdSS, NameSS; - - IdSS << "training_usage_for_" << RH->machine_guid; - NameSS << "training_usage_for_" << rrdhost_hostname(RH); - - RS = rrdset_create_localhost( - "netdata", // type - IdSS.str().c_str(), // id - NameSS.str().c_str(), // name - NETDATA_ML_CHART_FAMILY, // family - "netdata.training_usage", // ctx - "Training resource usage", // title - "milliseconds/s", // units - NETDATA_ML_PLUGIN, // plugin - NETDATA_ML_MODULE_TRAINING, // module - NETDATA_ML_CHART_PRIO_TRAINING_USAGE, // priority - RH->rrd_update_every, // update_every - RRDSET_TYPE_STACKED // chart_type - ); - rrdset_flag_set(RS, RRDSET_FLAG_ANOMALY_DETECTION); - - User = rrddim_add(RS, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); - System = rrddim_add(RS, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); - } - - rrddim_set_by_pointer(RS, User, TrainingRU.ru_utime.tv_sec * 1000000ULL + TrainingRU.ru_utime.tv_usec); - rrddim_set_by_pointer(RS, System, TrainingRU.ru_stime.tv_sec * 1000000ULL + TrainingRU.ru_stime.tv_usec); - - rrdset_done(RS); - } -} - -void ml::updateTrainingStatisticsChart(RRDHOST *RH, const TrainingStats &TS) { - /* - * queue stats - */ - { - static thread_local RRDSET *RS = nullptr; - - static thread_local RRDDIM *QueueSize = nullptr; - static thread_local RRDDIM *PoppedItems = nullptr; - - if (!RS) { - std::stringstream IdSS, NameSS; - - IdSS << "queue_stats_on_" << localhost->machine_guid; - NameSS << "queue_stats_on_" << rrdhost_hostname(localhost); - - RS = rrdset_create( - RH, - "netdata", // type - IdSS.str().c_str(), // id - NameSS.str().c_str(), // name - NETDATA_ML_CHART_FAMILY, // family - "netdata.queue_stats", // ctx - "Training queue stats", // title - "items", // units - NETDATA_ML_PLUGIN, // plugin - NETDATA_ML_MODULE_TRAINING, // module - NETDATA_ML_CHART_PRIO_QUEUE_STATS, // priority - RH->rrd_update_every, // update_every - RRDSET_TYPE_LINE// chart_type - ); - rrdset_flag_set(RS, RRDSET_FLAG_ANOMALY_DETECTION); - - QueueSize = rrddim_add(RS, "queue_size", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - PoppedItems = rrddim_add(RS, "popped_items", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - } - - rrddim_set_by_pointer(RS, QueueSize, TS.QueueSize); - rrddim_set_by_pointer(RS, PoppedItems, TS.NumPoppedItems); - - rrdset_done(RS); - } - - /* - * training stats - */ - { - static thread_local RRDSET *RS = nullptr; - - static thread_local RRDDIM *Allotted = nullptr; - static thread_local RRDDIM *Consumed = nullptr; - static thread_local RRDDIM *Remaining = nullptr; - - if (!RS) { - std::stringstream IdSS, NameSS; - - IdSS << "training_time_stats_on_" << localhost->machine_guid; - NameSS << "training_time_stats_on_" << rrdhost_hostname(localhost); - - RS = rrdset_create( - RH, - "netdata", // type - IdSS.str().c_str(), // id - NameSS.str().c_str(), // name - NETDATA_ML_CHART_FAMILY, // family - "netdata.training_time_stats", // ctx - "Training time stats", // title - "milliseconds", // units - NETDATA_ML_PLUGIN, // plugin - NETDATA_ML_MODULE_TRAINING, // module - NETDATA_ML_CHART_PRIO_TRAINING_TIME_STATS, // priority - RH->rrd_update_every, // update_every - RRDSET_TYPE_LINE// chart_type - ); - rrdset_flag_set(RS, RRDSET_FLAG_ANOMALY_DETECTION); - - Allotted = rrddim_add(RS, "allotted", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE); - Consumed = rrddim_add(RS, "consumed", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE); - Remaining = rrddim_add(RS, "remaining", NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE); - } - - rrddim_set_by_pointer(RS, Allotted, TS.AllottedUT); - rrddim_set_by_pointer(RS, Consumed, TS.ConsumedUT); - rrddim_set_by_pointer(RS, Remaining, TS.RemainingUT); - - rrdset_done(RS); - } - - /* - * training result stats - */ - { - static thread_local RRDSET *RS = nullptr; - - static thread_local RRDDIM *Ok = nullptr; - static thread_local RRDDIM *InvalidQueryTimeRange = nullptr; - static thread_local RRDDIM *NotEnoughCollectedValues = nullptr; - static thread_local RRDDIM *NullAcquiredDimension = nullptr; - static thread_local RRDDIM *ChartUnderReplication = nullptr; - - if (!RS) { - std::stringstream IdSS, NameSS; - - IdSS << "training_results_on_" << localhost->machine_guid; - NameSS << "training_results_on_" << rrdhost_hostname(localhost); - - RS = rrdset_create( - RH, - "netdata", // type - IdSS.str().c_str(), // id - NameSS.str().c_str(), // name - NETDATA_ML_CHART_FAMILY, // family - "netdata.training_results", // ctx - "Training results", // title - "events", // units - NETDATA_ML_PLUGIN, // plugin - NETDATA_ML_MODULE_TRAINING, // module - NETDATA_ML_CHART_PRIO_TRAINING_RESULTS, // priority - RH->rrd_update_every, // update_every - RRDSET_TYPE_LINE// chart_type - ); - rrdset_flag_set(RS, RRDSET_FLAG_ANOMALY_DETECTION); - - Ok = rrddim_add(RS, "ok", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - InvalidQueryTimeRange = rrddim_add(RS, "invalid-queries", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - NotEnoughCollectedValues = rrddim_add(RS, "not-enough-values", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - NullAcquiredDimension = rrddim_add(RS, "null-acquired-dimensions", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - ChartUnderReplication = rrddim_add(RS, "chart-under-replication", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - } - - rrddim_set_by_pointer(RS, Ok, TS.TrainingResultOk); - rrddim_set_by_pointer(RS, InvalidQueryTimeRange, TS.TrainingResultInvalidQueryTimeRange); - rrddim_set_by_pointer(RS, NotEnoughCollectedValues, TS.TrainingResultNotEnoughCollectedValues); - rrddim_set_by_pointer(RS, NullAcquiredDimension, TS.TrainingResultNullAcquiredDimension); - rrddim_set_by_pointer(RS, ChartUnderReplication, TS.TrainingResultChartUnderReplication); - - rrdset_done(RS); - } -} diff --git a/ml/ADCharts.h b/ml/ADCharts.h deleted file mode 100644 index ee09669e22..0000000000 --- a/ml/ADCharts.h +++ /dev/null @@ -1,21 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#ifndef ML_ADCHARTS_H -#define ML_ADCHARTS_H - -#include "Stats.h" -#include "ml-private.h" - -namespace ml { - -void updateDimensionsChart(RRDHOST *RH, const MachineLearningStats &MLS); - -void updateHostAndDetectionRateCharts(RRDHOST *RH, collected_number AnomalyRate); - -void updateResourceUsageCharts(RRDHOST *RH, const struct rusage &PredictionRU, const struct rusage &TrainingRU); - -void updateTrainingStatisticsChart(RRDHOST *RH, const TrainingStats &TS); - -} // namespace ml - -#endif /* ML_ADCHARTS_H */ diff --git a/ml/Chart.cc b/ml/Chart.cc deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/ml/Chart.h b/ml/Chart.h deleted file mode 100644 index dbd6a910f9..0000000000 --- a/ml/Chart.h +++ /dev/null @@ -1,128 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#ifndef ML_CHART_H -#define ML_CHART_H - -#include "Config.h" -#include "Dimension.h" - -#include "ml-private.h" -#include "json/single_include/nlohmann/json.hpp" - -namespace ml -{ - -class Chart { -public: - Chart(RRDSET *RS) : - RS(RS), - MLS() - { } - - RRDSET *getRS() const { - return RS; - } - - bool isAvailableForML() { - return rrdset_is_available_for_exporting_and_alarms(RS); - } - - void addDimension(Dimension *D) { - std::lock_guard L(M); - Dimensions[D->getRD()] = D; - } - - void removeDimension(Dimension *D) { - std::lock_guard L(M); - Dimensions.erase(D->getRD()); - } - - void getModelsAsJson(nlohmann::json &Json) { - std::lock_guard L(M); - - for (auto &DP : Dimensions) { - Dimension *D = DP.second; - nlohmann::json JsonArray = nlohmann::json::array(); - for (const KMeans &KM : D->getModels()) { - nlohmann::json J; - KM.toJson(J); - JsonArray.push_back(J); - } - - Json[getMLDimensionID(D->getRD())] = JsonArray; - } - } - - void updateBegin() { - M.lock(); - MLS = {}; - } - - void updateDimension(Dimension *D, bool IsAnomalous) { - switch (D->getMLS()) { - case MachineLearningStatus::DisabledDueToUniqueUpdateEvery: - MLS.NumMachineLearningStatusDisabledUE++; - return; - case MachineLearningStatus::DisabledDueToExcludedChart: - MLS.NumMachineLearningStatusDisabledSP++; - return; - case MachineLearningStatus::Enabled: { - MLS.NumMachineLearningStatusEnabled++; - - switch (D->getMT()) { - case MetricType::Constant: - MLS.NumMetricTypeConstant++; - MLS.NumTrainingStatusTrained++; - MLS.NumNormalDimensions++; - return; - case MetricType::Variable: - MLS.NumMetricTypeVariable++; - break; - } - - switch (D->getTS()) { - case TrainingStatus::Untrained: - MLS.NumTrainingStatusUntrained++; - return; - case TrainingStatus::PendingWithoutModel: - MLS.NumTrainingStatusPendingWithoutModel++; - return; - case TrainingStatus::Trained: - MLS.NumTrainingStatusTrained++; - - MLS.NumAnomalousDimensions += IsAnomalous; - MLS.NumNormalDimensions += !IsAnomalous; - return; - case TrainingStatus::PendingWithModel: - MLS.NumTrainingStatusPendingWithModel++; - - MLS.NumAnomalousDimensions += IsAnomalous; - MLS.NumNormalDimensions += !IsAnomalous; - return; - } - - return; - } - } - } - - void updateEnd() { - M.unlock(); - } - - MachineLearningStats getMLS() { - std::lock_guard L(M); - return MLS; - } - -private: - RRDSET *RS; - MachineLearningStats MLS; - - Mutex M; - std::unordered_map Dimensions; -}; - -} // namespace ml - -#endif /* ML_CHART_H */ diff --git a/ml/Config.cc b/ml/Config.cc index cdc43d1655..d7ddff800d 100644 --- a/ml/Config.cc +++ b/ml/Config.cc @@ -1,15 +1,12 @@ // SPDX-License-Identifier: GPL-3.0-or-later -#include "Config.h" -#include "ml-private.h" - -using namespace ml; +#include "nml.h" /* * Global configuration instance to be shared between training and * prediction threads. */ -Config ml::Cfg; +nml_config_t Cfg; template static T clamp(const T& Value, const T& Min, const T& Max) { @@ -19,97 +16,97 @@ static T clamp(const T& Value, const T& Min, const T& Max) { /* * Initialize global configuration variable. */ -void Config::readMLConfig(void) { - const char *ConfigSectionML = CONFIG_SECTION_ML; +void nml_config_load(nml_config_t *cfg) { + const char *config_section_ml = CONFIG_SECTION_ML; - bool EnableAnomalyDetection = config_get_boolean(ConfigSectionML, "enabled", true); + bool enable_anomaly_detection = config_get_boolean(config_section_ml, "enabled", true); /* * Read values */ - unsigned MaxTrainSamples = config_get_number(ConfigSectionML, "maximum num samples to train", 4 * 3600); - unsigned MinTrainSamples = config_get_number(ConfigSectionML, "minimum num samples to train", 1 * 900); - unsigned TrainEvery = config_get_number(ConfigSectionML, "train every", 1 * 3600); - unsigned NumModelsToUse = config_get_number(ConfigSectionML, "number of models per dimension", 1); + unsigned max_train_samples = config_get_number(config_section_ml, "maximum num samples to train", 4 * 3600); + unsigned min_train_samples = config_get_number(config_section_ml, "minimum num samples to train", 1 * 900); + unsigned train_every = config_get_number(config_section_ml, "train every", 1 * 3600); + unsigned num_models_to_use = config_get_number(config_section_ml, "number of models per dimension", 1); - unsigned DiffN = config_get_number(ConfigSectionML, "num samples to diff", 1); - unsigned SmoothN = config_get_number(ConfigSectionML, "num samples to smooth", 3); - unsigned LagN = config_get_number(ConfigSectionML, "num samples to lag", 5); + unsigned diff_n = config_get_number(config_section_ml, "num samples to diff", 1); + unsigned smooth_n = config_get_number(config_section_ml, "num samples to smooth", 3); + unsigned lag_n = config_get_number(config_section_ml, "num samples to lag", 5); - double RandomSamplingRatio = config_get_float(ConfigSectionML, "random sampling ratio", 1.0 / LagN); - unsigned MaxKMeansIters = config_get_number(ConfigSectionML, "maximum number of k-means iterations", 1000); + double random_sampling_ratio = config_get_float(config_section_ml, "random sampling ratio", 1.0 / lag_n); + unsigned max_kmeans_iters = config_get_number(config_section_ml, "maximum number of k-means iterations", 1000); - double DimensionAnomalyScoreThreshold = config_get_float(ConfigSectionML, "dimension anomaly score threshold", 0.99); + double dimension_anomaly_rate_threshold = config_get_float(config_section_ml, "dimension anomaly score threshold", 0.99); - double HostAnomalyRateThreshold = config_get_float(ConfigSectionML, "host anomaly rate threshold", 1.0); - std::string AnomalyDetectionGroupingMethod = config_get(ConfigSectionML, "anomaly detection grouping method", "average"); - time_t AnomalyDetectionQueryDuration = config_get_number(ConfigSectionML, "anomaly detection grouping duration", 5 * 60); + double host_anomaly_rate_threshold = config_get_float(config_section_ml, "host anomaly rate threshold", 1.0); + std::string anomaly_detection_grouping_method = config_get(config_section_ml, "anomaly detection grouping method", "average"); + time_t anomaly_detection_query_duration = config_get_number(config_section_ml, "anomaly detection grouping duration", 5 * 60); /* * Clamp */ - MaxTrainSamples = clamp(MaxTrainSamples, 1 * 3600, 24 * 3600); - MinTrainSamples = clamp(MinTrainSamples, 1 * 900, 6 * 3600); - TrainEvery = clamp(TrainEvery, 1 * 3600, 6 * 3600); - NumModelsToUse = clamp(NumModelsToUse, 1, 7 * 24); + max_train_samples = clamp(max_train_samples, 1 * 3600, 24 * 3600); + min_train_samples = clamp(min_train_samples, 1 * 900, 6 * 3600); + train_every = clamp(train_every, 1 * 3600, 6 * 3600); + num_models_to_use = clamp(num_models_to_use, 1, 7 * 24); - DiffN = clamp(DiffN, 0u, 1u); - SmoothN = clamp(SmoothN, 0u, 5u); - LagN = clamp(LagN, 1u, 5u); + diff_n = clamp(diff_n, 0u, 1u); + smooth_n = clamp(smooth_n, 0u, 5u); + lag_n = clamp(lag_n, 1u, 5u); - RandomSamplingRatio = clamp(RandomSamplingRatio, 0.2, 1.0); - MaxKMeansIters = clamp(MaxKMeansIters, 500u, 1000u); + random_sampling_ratio = clamp(random_sampling_ratio, 0.2, 1.0); + max_kmeans_iters = clamp(max_kmeans_iters, 500u, 1000u); - DimensionAnomalyScoreThreshold = clamp(DimensionAnomalyScoreThreshold, 0.01, 5.00); + dimension_anomaly_rate_threshold = clamp(dimension_anomaly_rate_threshold, 0.01, 5.00); - HostAnomalyRateThreshold = clamp(HostAnomalyRateThreshold, 0.1, 10.0); - AnomalyDetectionQueryDuration = clamp(AnomalyDetectionQueryDuration, 60, 15 * 60); + host_anomaly_rate_threshold = clamp(host_anomaly_rate_threshold, 0.1, 10.0); + anomaly_detection_query_duration = clamp(anomaly_detection_query_duration, 60, 15 * 60); /* * Validate */ - if (MinTrainSamples >= MaxTrainSamples) { - error("invalid min/max train samples found (%u >= %u)", MinTrainSamples, MaxTrainSamples); + if (min_train_samples >= max_train_samples) { + error("invalid min/max train samples found (%u >= %u)", min_train_samples, max_train_samples); - MinTrainSamples = 1 * 3600; - MaxTrainSamples = 4 * 3600; + min_train_samples = 1 * 3600; + max_train_samples = 4 * 3600; } /* * Assign to config instance */ - Cfg.EnableAnomalyDetection = EnableAnomalyDetection; + cfg->enable_anomaly_detection = enable_anomaly_detection; - Cfg.MaxTrainSamples = MaxTrainSamples; - Cfg.MinTrainSamples = MinTrainSamples; - Cfg.TrainEvery = TrainEvery; - Cfg.NumModelsToUse = NumModelsToUse; + cfg->max_train_samples = max_train_samples; + cfg->min_train_samples = min_train_samples; + cfg->train_every = train_every; - Cfg.DiffN = DiffN; - Cfg.SmoothN = SmoothN; - Cfg.LagN = LagN; + cfg->num_models_to_use = num_models_to_use; - Cfg.RandomSamplingRatio = RandomSamplingRatio; - Cfg.MaxKMeansIters = MaxKMeansIters; + cfg->diff_n = diff_n; + cfg->smooth_n = smooth_n; + cfg->lag_n = lag_n; - Cfg.DimensionAnomalyScoreThreshold = DimensionAnomalyScoreThreshold; + cfg->random_sampling_ratio = random_sampling_ratio; + cfg->max_kmeans_iters = max_kmeans_iters; - Cfg.HostAnomalyRateThreshold = HostAnomalyRateThreshold; - Cfg.AnomalyDetectionGroupingMethod = time_grouping_parse( - AnomalyDetectionGroupingMethod.c_str(), RRDR_GROUPING_AVERAGE); - Cfg.AnomalyDetectionQueryDuration = AnomalyDetectionQueryDuration; + cfg->host_anomaly_rate_threshold = host_anomaly_rate_threshold; + cfg->anomaly_detection_grouping_method = + time_grouping_parse(anomaly_detection_grouping_method.c_str(), RRDR_GROUPING_AVERAGE); + cfg->anomaly_detection_query_duration = anomaly_detection_query_duration; + cfg->dimension_anomaly_score_threshold = dimension_anomaly_rate_threshold; - Cfg.HostsToSkip = config_get(ConfigSectionML, "hosts to skip from training", "!*"); - Cfg.SP_HostsToSkip = simple_pattern_create(Cfg.HostsToSkip.c_str(), NULL, SIMPLE_PATTERN_EXACT); + cfg->hosts_to_skip = config_get(config_section_ml, "hosts to skip from training", "!*"); + cfg->sp_host_to_skip = simple_pattern_create(cfg->hosts_to_skip.c_str(), NULL, SIMPLE_PATTERN_EXACT); // Always exclude anomaly_detection charts from training. - Cfg.ChartsToSkip = "anomaly_detection.* "; - Cfg.ChartsToSkip += config_get(ConfigSectionML, "charts to skip from training", "netdata.*"); - Cfg.SP_ChartsToSkip = simple_pattern_create(Cfg.ChartsToSkip.c_str(), NULL, SIMPLE_PATTERN_EXACT); + cfg->charts_to_skip = "anomaly_detection.* "; + cfg->charts_to_skip += config_get(config_section_ml, "charts to skip from training", "netdata.*"); + cfg->sp_charts_to_skip = simple_pattern_create(cfg->charts_to_skip.c_str(), NULL, SIMPLE_PATTERN_EXACT); - Cfg.StreamADCharts = config_get_boolean(ConfigSectionML, "stream anomaly detection charts", true); + cfg->stream_anomaly_detection_charts = config_get_boolean(config_section_ml, "stream anomaly detection charts", true); } diff --git a/ml/Config.h b/ml/Config.h deleted file mode 100644 index 26ceaf3004..0000000000 --- a/ml/Config.h +++ /dev/null @@ -1,52 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#ifndef ML_CONFIG_H -#define ML_CONFIG_H - -#include "ml-private.h" - -namespace ml { - -class Config { -public: - bool EnableAnomalyDetection; - - unsigned MaxTrainSamples; - unsigned MinTrainSamples; - unsigned TrainEvery; - - unsigned NumModelsToUse; - - unsigned DBEngineAnomalyRateEvery; - - unsigned DiffN; - unsigned SmoothN; - unsigned LagN; - - double RandomSamplingRatio; - unsigned MaxKMeansIters; - - double DimensionAnomalyScoreThreshold; - - double HostAnomalyRateThreshold; - RRDR_TIME_GROUPING AnomalyDetectionGroupingMethod; - time_t AnomalyDetectionQueryDuration; - - bool StreamADCharts; - - std::string HostsToSkip; - SIMPLE_PATTERN *SP_HostsToSkip; - - std::string ChartsToSkip; - SIMPLE_PATTERN *SP_ChartsToSkip; - - std::vector RandomNums; - - void readMLConfig(); -}; - -extern Config Cfg; - -} // namespace ml - -#endif /* ML_CONFIG_H */ diff --git a/ml/Dimension.cc b/ml/Dimension.cc deleted file mode 100644 index db92568958..0000000000 --- a/ml/Dimension.cc +++ /dev/null @@ -1,346 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#include "Config.h" -#include "Dimension.h" -#include "Query.h" -#include "Host.h" - -using namespace ml; - -static const char *mls2str(MachineLearningStatus MLS) { - switch (MLS) { - case ml::MachineLearningStatus::Enabled: - return "enabled"; - case ml::MachineLearningStatus::DisabledDueToUniqueUpdateEvery: - return "disabled-ue"; - case ml::MachineLearningStatus::DisabledDueToExcludedChart: - return "disabled-sp"; - default: - return "unknown"; - } -} - -static const char *mt2str(MetricType MT) { - switch (MT) { - case ml::MetricType::Constant: - return "constant"; - case ml::MetricType::Variable: - return "variable"; - default: - return "unknown"; - } -} - -static const char *ts2str(TrainingStatus TS) { - switch (TS) { - case ml::TrainingStatus::PendingWithModel: - return "pending-with-model"; - case ml::TrainingStatus::PendingWithoutModel: - return "pending-without-model"; - case ml::TrainingStatus::Trained: - return "trained"; - case ml::TrainingStatus::Untrained: - return "untrained"; - default: - return "unknown"; - } -} - -static const char *tr2str(TrainingResult TR) { - switch (TR) { - case ml::TrainingResult::Ok: - return "ok"; - case ml::TrainingResult::InvalidQueryTimeRange: - return "invalid-query"; - case ml::TrainingResult::NotEnoughCollectedValues: - return "missing-values"; - case ml::TrainingResult::NullAcquiredDimension: - return "null-acquired-dim"; - case ml::TrainingResult::ChartUnderReplication: - return "chart-under-replication"; - default: - return "unknown"; - } -} - -std::pair Dimension::getCalculatedNumbers(const TrainingRequest &TrainingReq) { - TrainingResponse TrainingResp = {}; - - TrainingResp.RequestTime = TrainingReq.RequestTime; - TrainingResp.FirstEntryOnRequest = TrainingReq.FirstEntryOnRequest; - TrainingResp.LastEntryOnRequest = TrainingReq.LastEntryOnRequest; - - TrainingResp.FirstEntryOnResponse = rrddim_first_entry_s_of_tier(RD, 0); - TrainingResp.LastEntryOnResponse = rrddim_last_entry_s_of_tier(RD, 0); - - size_t MinN = Cfg.MinTrainSamples; - size_t MaxN = Cfg.MaxTrainSamples; - - // Figure out what our time window should be. - TrainingResp.QueryBeforeT = TrainingResp.LastEntryOnResponse; - TrainingResp.QueryAfterT = std::max( - TrainingResp.QueryBeforeT - static_cast((MaxN - 1) * updateEvery()), - TrainingResp.FirstEntryOnResponse - ); - - if (TrainingResp.QueryAfterT >= TrainingResp.QueryBeforeT) { - TrainingResp.Result = TrainingResult::InvalidQueryTimeRange; - return { nullptr, TrainingResp }; - } - - if (rrdset_is_replicating(RD->rrdset)) { - TrainingResp.Result = TrainingResult::ChartUnderReplication; - return { nullptr, TrainingResp }; - } - - CalculatedNumber *CNs = new CalculatedNumber[MaxN * (Cfg.LagN + 1)](); - - // Start the query. - size_t Idx = 0; - - CalculatedNumber LastValue = std::numeric_limits::quiet_NaN(); - Query Q = Query(getRD()); - - Q.init(TrainingResp.QueryAfterT, TrainingResp.QueryBeforeT); - while (!Q.isFinished()) { - if (Idx == MaxN) - break; - - auto P = Q.nextMetric(); - - CalculatedNumber Value = P.second; - - if (netdata_double_isnumber(Value)) { - if (!TrainingResp.DbAfterT) - TrainingResp.DbAfterT = P.first; - TrainingResp.DbBeforeT = P.first; - - CNs[Idx] = Value; - LastValue = CNs[Idx]; - TrainingResp.CollectedValues++; - } else - CNs[Idx] = LastValue; - - Idx++; - } - TrainingResp.TotalValues = Idx; - - if (TrainingResp.CollectedValues < MinN) { - TrainingResp.Result = TrainingResult::NotEnoughCollectedValues; - - delete[] CNs; - return { nullptr, TrainingResp }; - } - - // Find first non-NaN value. - for (Idx = 0; std::isnan(CNs[Idx]); Idx++, TrainingResp.TotalValues--) { } - - // Overwrite NaN values. - if (Idx != 0) - memmove(CNs, &CNs[Idx], sizeof(CalculatedNumber) * TrainingResp.TotalValues); - - TrainingResp.Result = TrainingResult::Ok; - return { CNs, TrainingResp }; -} - -TrainingResult Dimension::trainModel(const TrainingRequest &TrainingReq) { - auto P = getCalculatedNumbers(TrainingReq); - CalculatedNumber *CNs = P.first; - TrainingResponse TrainingResp = P.second; - - if (TrainingResp.Result != TrainingResult::Ok) { - std::lock_guard L(M); - - MT = MetricType::Constant; - - switch (TS) { - case TrainingStatus::PendingWithModel: - TS = TrainingStatus::Trained; - break; - case TrainingStatus::PendingWithoutModel: - TS = TrainingStatus::Untrained; - break; - default: - break; - } - - TR = TrainingResp; - - LastTrainingTime = TrainingResp.LastEntryOnResponse; - return TrainingResp.Result; - } - - unsigned N = TrainingResp.TotalValues; - unsigned TargetNumSamples = Cfg.MaxTrainSamples * Cfg.RandomSamplingRatio; - double SamplingRatio = std::min(static_cast(TargetNumSamples) / N, 1.0); - - SamplesBuffer SB = SamplesBuffer(CNs, N, 1, Cfg.DiffN, Cfg.SmoothN, Cfg.LagN, - SamplingRatio, Cfg.RandomNums); - std::vector Samples; - SB.preprocess(Samples); - - KMeans KM; - KM.train(Samples, Cfg.MaxKMeansIters); - - { - std::lock_guard L(M); - - if (Models.size() < Cfg.NumModelsToUse) { - Models.push_back(std::move(KM)); - } else { - std::rotate(std::begin(Models), std::begin(Models) + 1, std::end(Models)); - Models[Models.size() - 1] = std::move(KM); - } - - MT = MetricType::Constant; - TS = TrainingStatus::Trained; - TR = TrainingResp; - LastTrainingTime = rrddim_last_entry_s(RD); - } - - delete[] CNs; - return TrainingResp.Result; -} - -void Dimension::scheduleForTraining(time_t CurrT) { - switch (MT) { - case MetricType::Constant: { - return; - } default: - break; - } - - switch (TS) { - case TrainingStatus::PendingWithModel: - case TrainingStatus::PendingWithoutModel: - break; - case TrainingStatus::Untrained: { - Host *H = reinterpret_cast(RD->rrdset->rrdhost->ml_host); - TS = TrainingStatus::PendingWithoutModel; - H->scheduleForTraining(getTrainingRequest(CurrT)); - break; - } - case TrainingStatus::Trained: { - bool NeedsTraining = (time_t)(LastTrainingTime + (Cfg.TrainEvery * updateEvery())) < CurrT; - - if (NeedsTraining) { - Host *H = reinterpret_cast(RD->rrdset->rrdhost->ml_host); - TS = TrainingStatus::PendingWithModel; - H->scheduleForTraining(getTrainingRequest(CurrT)); - } - break; - } - } -} - -bool Dimension::predict(time_t CurrT, CalculatedNumber Value, bool Exists) { - // Nothing to do if ML is disabled for this dimension - if (MLS != MachineLearningStatus::Enabled) - return false; - - // Don't treat values that don't exist as anomalous - if (!Exists) { - CNs.clear(); - return false; - } - - // Save the value and return if we don't have enough values for a sample - unsigned N = Cfg.DiffN + Cfg.SmoothN + Cfg.LagN; - if (CNs.size() < N) { - CNs.push_back(Value); - return false; - } - - // Push the value and check if it's different from the last one - bool SameValue = true; - std::rotate(std::begin(CNs), std::begin(CNs) + 1, std::end(CNs)); - if (CNs[N - 1] != Value) - SameValue = false; - CNs[N - 1] = Value; - - // Create the sample - CalculatedNumber TmpCNs[N * (Cfg.LagN + 1)]; - memset(TmpCNs, 0, N * (Cfg.LagN + 1) * sizeof(CalculatedNumber)); - std::memcpy(TmpCNs, CNs.data(), N * sizeof(CalculatedNumber)); - SamplesBuffer SB = SamplesBuffer(TmpCNs, N, 1, - Cfg.DiffN, Cfg.SmoothN, Cfg.LagN, - 1.0, Cfg.RandomNums); - SB.preprocess(Feature); - - /* - * Lock to predict and possibly schedule the dimension for training - */ - - std::unique_lock L(M, std::defer_lock); - if (!L.try_lock()) { - return false; - } - - // Mark the metric time as variable if we received different values - if (!SameValue) - MT = MetricType::Variable; - - // Decide if the dimension needs to be scheduled for training - scheduleForTraining(CurrT); - - // Nothing to do if we don't have a model - switch (TS) { - case TrainingStatus::Untrained: - case TrainingStatus::PendingWithoutModel: - return false; - default: - break; - } - - /* - * Use the KMeans models to check if the value is anomalous - */ - - size_t ModelsConsulted = 0; - size_t Sum = 0; - - for (const auto &KM : Models) { - ModelsConsulted++; - - double AnomalyScore = KM.anomalyScore(Feature); - if (AnomalyScore == std::numeric_limits::quiet_NaN()) - continue; - - if (AnomalyScore < (100 * Cfg.DimensionAnomalyScoreThreshold)) { - global_statistics_ml_models_consulted(ModelsConsulted); - return false; - } - - Sum += 1; - } - - global_statistics_ml_models_consulted(ModelsConsulted); - return Sum; -} - -std::vector Dimension::getModels() { - std::unique_lock L(M); - return Models; -} - -void Dimension::dump() const { - const char *ChartId = rrdset_id(RD->rrdset); - const char *DimensionId = rrddim_id(RD); - - const char *MLS_Str = mls2str(MLS); - const char *MT_Str = mt2str(MT); - const char *TS_Str = ts2str(TS); - const char *TR_Str = tr2str(TR.Result); - - const char *fmt = - "[ML] %s.%s: MLS=%s, MT=%s, TS=%s, Result=%s, " - "ReqTime=%ld, FEOReq=%ld, LEOReq=%ld, " - "FEOResp=%ld, LEOResp=%ld, QTR=<%ld, %ld>, DBTR=<%ld, %ld>, Collected=%zu, Total=%zu"; - - error(fmt, - ChartId, DimensionId, MLS_Str, MT_Str, TS_Str, TR_Str, - TR.RequestTime, TR.FirstEntryOnRequest, TR.LastEntryOnRequest, - TR.FirstEntryOnResponse, TR.LastEntryOnResponse, - TR.QueryAfterT, TR.QueryBeforeT, TR.DbAfterT, TR.DbBeforeT, TR.CollectedValues, TR.TotalValues - ); -} diff --git a/ml/Dimension.h b/ml/Dimension.h deleted file mode 100644 index 2b1adfff9e..0000000000 --- a/ml/Dimension.h +++ /dev/null @@ -1,198 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#ifndef ML_DIMENSION_H -#define ML_DIMENSION_H - -#include "Mutex.h" -#include "Stats.h" -#include "Query.h" -#include "Config.h" - -#include "ml-private.h" - -namespace ml { - -static inline std::string getMLDimensionID(RRDDIM *RD) { - RRDSET *RS = RD->rrdset; - - std::stringstream SS; - SS << rrdset_context(RS) << "|" << rrdset_id(RS) << "|" << rrddim_name(RD); - return SS.str(); -} - -enum class MachineLearningStatus { - // Enable training/prediction - Enabled, - - // Disable due to update every being different from the host's - DisabledDueToUniqueUpdateEvery, - - // Disable because configuration pattern matches the chart's id - DisabledDueToExcludedChart, -}; - -enum class TrainingStatus { - // We don't have a model for this dimension - Untrained, - - // Request for training sent, but we don't have any models yet - PendingWithoutModel, - - // Request to update existing models sent - PendingWithModel, - - // Have a valid, up-to-date model - Trained, -}; - -enum class MetricType { - // The dimension has constant values, no need to train - Constant, - - // The dimension's values fluctuate, we need to generate a model - Variable, -}; - -struct TrainingRequest { - // Chart/dimension we want to train - STRING *ChartId; - STRING *DimensionId; - - // Creation time of request - time_t RequestTime; - - // First/last entry of this dimension in DB - // at the point the request was made - time_t FirstEntryOnRequest; - time_t LastEntryOnRequest; -}; - -void dumpTrainingRequest(const TrainingRequest &TrainingReq, const char *Prefix); - -enum TrainingResult { - // We managed to create a KMeans model - Ok, - // Could not query DB with a correct time range - InvalidQueryTimeRange, - // Did not gather enough data from DB to run KMeans - NotEnoughCollectedValues, - // Acquired a null dimension - NullAcquiredDimension, - // Chart is under replication - ChartUnderReplication, -}; - -struct TrainingResponse { - // Time when the request for this response was made - time_t RequestTime; - - // First/last entry of the dimension in DB when generating the request - time_t FirstEntryOnRequest; - time_t LastEntryOnRequest; - - // First/last entry of the dimension in DB when generating the response - time_t FirstEntryOnResponse; - time_t LastEntryOnResponse; - - // After/Before timestamps of our DB query - time_t QueryAfterT; - time_t QueryBeforeT; - - // Actual after/before returned by the DB query ops - time_t DbAfterT; - time_t DbBeforeT; - - // Number of doubles returned by the DB query - size_t CollectedValues; - - // Number of values we return to the caller - size_t TotalValues; - - // Result of training response - TrainingResult Result; -}; - -void dumpTrainingResponse(const TrainingResponse &TrainingResp, const char *Prefix); - -class Dimension { -public: - Dimension(RRDDIM *RD) : - RD(RD), - MT(MetricType::Constant), - TS(TrainingStatus::Untrained), - TR(), - LastTrainingTime(0) - { - if (simple_pattern_matches(Cfg.SP_ChartsToSkip, rrdset_name(RD->rrdset))) - MLS = MachineLearningStatus::DisabledDueToExcludedChart; - else if (RD->update_every != RD->rrdset->rrdhost->rrd_update_every) - MLS = MachineLearningStatus::DisabledDueToUniqueUpdateEvery; - else - MLS = MachineLearningStatus::Enabled; - - Models.reserve(Cfg.NumModelsToUse); - } - - RRDDIM *getRD() const { - return RD; - } - - unsigned updateEvery() const { - return RD->update_every; - } - - MetricType getMT() const { - return MT; - } - - TrainingStatus getTS() const { - return TS; - } - - MachineLearningStatus getMLS() const { - return MLS; - } - - TrainingResult trainModel(const TrainingRequest &TR); - - void scheduleForTraining(time_t CurrT); - - bool predict(time_t CurrT, CalculatedNumber Value, bool Exists); - - std::vector getModels(); - - void dump() const; - -private: - TrainingRequest getTrainingRequest(time_t CurrT) const { - return TrainingRequest { - string_dup(RD->rrdset->id), - string_dup(RD->id), - CurrT, - rrddim_first_entry_s(RD), - rrddim_last_entry_s(RD) - }; - } - -private: - std::pair getCalculatedNumbers(const TrainingRequest &TrainingReq); - -public: - RRDDIM *RD; - MetricType MT; - TrainingStatus TS; - TrainingResponse TR; - - time_t LastTrainingTime; - - MachineLearningStatus MLS; - - std::vector CNs; - DSample Feature; - std::vector Models; - Mutex M; -}; - -} // namespace ml - -#endif /* ML_DIMENSION_H */ diff --git a/ml/Host.cc b/ml/Host.cc deleted file mode 100644 index e22580be9a..0000000000 --- a/ml/Host.cc +++ /dev/null @@ -1,387 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#include "Config.h" -#include "Host.h" -#include "Queue.h" -#include "ADCharts.h" - -#include "json/single_include/nlohmann/json.hpp" - -using namespace ml; - -void Host::addChart(Chart *C) { - std::lock_guard L(M); - Charts[C->getRS()] = C; -} - -void Host::removeChart(Chart *C) { - std::lock_guard L(M); - Charts.erase(C->getRS()); -} - -void Host::getConfigAsJson(BUFFER *wb) const { - buffer_json_member_add_uint64(wb, "version", 1); - - buffer_json_member_add_boolean(wb, "enabled", Cfg.EnableAnomalyDetection); - - buffer_json_member_add_uint64(wb, "min-train-samples", Cfg.MinTrainSamples); - buffer_json_member_add_uint64(wb, "max-train-samples", Cfg.MaxTrainSamples); - buffer_json_member_add_uint64(wb, "train-every", Cfg.TrainEvery); - - buffer_json_member_add_uint64(wb, "diff-n", Cfg.DiffN); - buffer_json_member_add_uint64(wb, "smooth-n", Cfg.SmoothN); - buffer_json_member_add_uint64(wb, "lag-n", Cfg.LagN); - - buffer_json_member_add_double(wb, "random-sampling-ratio", Cfg.RandomSamplingRatio); - buffer_json_member_add_uint64(wb, "max-kmeans-iters", Cfg.MaxKMeansIters); - - buffer_json_member_add_double(wb, "dimension-anomaly-score-threshold", Cfg.DimensionAnomalyScoreThreshold); - - buffer_json_member_add_double(wb, "host-anomaly-rate-threshold", Cfg.HostAnomalyRateThreshold); - buffer_json_member_add_string(wb, "anomaly-detection-grouping-method", time_grouping_method2string(Cfg.AnomalyDetectionGroupingMethod)); - buffer_json_member_add_time_t(wb, "anomaly-detection-query-duration", Cfg.AnomalyDetectionQueryDuration); - - buffer_json_member_add_string(wb, "hosts-to-skip", Cfg.HostsToSkip.c_str()); - buffer_json_member_add_string(wb, "charts-to-skip", Cfg.ChartsToSkip.c_str()); -} - -void Host::getModelsAsJson(nlohmann::json &Json) { - std::lock_guard L(M); - - for (auto &CP : Charts) { - Chart *C = CP.second; - C->getModelsAsJson(Json); - } -} - -#define WORKER_JOB_DETECTION_PREP 0 -#define WORKER_JOB_DETECTION_DIM_CHART 1 -#define WORKER_JOB_DETECTION_HOST_CHART 2 -#define WORKER_JOB_DETECTION_STATS 3 -#define WORKER_JOB_DETECTION_RESOURCES 4 - -void Host::detectOnce() { - worker_is_busy(WORKER_JOB_DETECTION_PREP); - - MLS = {}; - MachineLearningStats MLSCopy = {}; - TrainingStats TSCopy = {}; - - { - std::lock_guard L(M); - - /* - * prediction/detection stats - */ - for (auto &CP : Charts) { - Chart *C = CP.second; - - if (!C->isAvailableForML()) - continue; - - MachineLearningStats ChartMLS = C->getMLS(); - - MLS.NumMachineLearningStatusEnabled += ChartMLS.NumMachineLearningStatusEnabled; - MLS.NumMachineLearningStatusDisabledUE += ChartMLS.NumMachineLearningStatusDisabledUE; - MLS.NumMachineLearningStatusDisabledSP += ChartMLS.NumMachineLearningStatusDisabledSP; - - MLS.NumMetricTypeConstant += ChartMLS.NumMetricTypeConstant; - MLS.NumMetricTypeVariable += ChartMLS.NumMetricTypeVariable; - - MLS.NumTrainingStatusUntrained += ChartMLS.NumTrainingStatusUntrained; - MLS.NumTrainingStatusPendingWithoutModel += ChartMLS.NumTrainingStatusPendingWithoutModel; - MLS.NumTrainingStatusTrained += ChartMLS.NumTrainingStatusTrained; - MLS.NumTrainingStatusPendingWithModel += ChartMLS.NumTrainingStatusPendingWithModel; - - MLS.NumAnomalousDimensions += ChartMLS.NumAnomalousDimensions; - MLS.NumNormalDimensions += ChartMLS.NumNormalDimensions; - } - - HostAnomalyRate = 0.0; - size_t NumActiveDimensions = MLS.NumAnomalousDimensions + MLS.NumNormalDimensions; - if (NumActiveDimensions) - HostAnomalyRate = static_cast(MLS.NumAnomalousDimensions) / NumActiveDimensions; - - MLSCopy = MLS; - - /* - * training stats - */ - TSCopy = TS; - - TS.QueueSize = 0; - TS.NumPoppedItems = 0; - - TS.AllottedUT = 0; - TS.ConsumedUT = 0; - TS.RemainingUT = 0; - - TS.TrainingResultOk = 0; - TS.TrainingResultInvalidQueryTimeRange = 0; - TS.TrainingResultNotEnoughCollectedValues = 0; - TS.TrainingResultNullAcquiredDimension = 0; - TS.TrainingResultChartUnderReplication = 0; - } - - // Calc the avg values - if (TSCopy.NumPoppedItems) { - TSCopy.QueueSize /= TSCopy.NumPoppedItems; - TSCopy.AllottedUT /= TSCopy.NumPoppedItems; - TSCopy.ConsumedUT /= TSCopy.NumPoppedItems; - TSCopy.RemainingUT /= TSCopy.NumPoppedItems; - - TSCopy.TrainingResultOk /= TSCopy.NumPoppedItems; - TSCopy.TrainingResultInvalidQueryTimeRange /= TSCopy.NumPoppedItems; - TSCopy.TrainingResultNotEnoughCollectedValues /= TSCopy.NumPoppedItems; - TSCopy.TrainingResultNullAcquiredDimension /= TSCopy.NumPoppedItems; - TSCopy.TrainingResultChartUnderReplication /= TSCopy.NumPoppedItems; - } else { - TSCopy.QueueSize = 0; - TSCopy.AllottedUT = 0; - TSCopy.ConsumedUT = 0; - TSCopy.RemainingUT = 0; - } - - if(!RH) - return; - - worker_is_busy(WORKER_JOB_DETECTION_DIM_CHART); - updateDimensionsChart(RH, MLSCopy); - - worker_is_busy(WORKER_JOB_DETECTION_HOST_CHART); - updateHostAndDetectionRateCharts(RH, HostAnomalyRate * 10000.0); - -#ifdef NETDATA_ML_RESOURCE_CHARTS - worker_is_busy(WORKER_JOB_DETECTION_RESOURCES); - struct rusage PredictionRU; - getrusage(RUSAGE_THREAD, &PredictionRU); - updateResourceUsageCharts(RH, PredictionRU, TSCopy.TrainingRU); -#endif - - worker_is_busy(WORKER_JOB_DETECTION_STATS); - updateTrainingStatisticsChart(RH, TSCopy); -} - -class AcquiredDimension { -public: - static AcquiredDimension find(RRDHOST *RH, STRING *ChartId, STRING *DimensionId) { - RRDDIM_ACQUIRED *AcqRD = nullptr; - Dimension *D = nullptr; - - RRDSET *RS = rrdset_find(RH, string2str(ChartId)); - if (RS) { - AcqRD = rrddim_find_and_acquire(RS, string2str(DimensionId)); - if (AcqRD) { - RRDDIM *RD = rrddim_acquired_to_rrddim(AcqRD); - if (RD) - D = reinterpret_cast(RD->ml_dimension); - } - } - - return AcquiredDimension(AcqRD, D); - } - -private: - AcquiredDimension(RRDDIM_ACQUIRED *AcqRD, Dimension *D) : AcqRD(AcqRD), D(D) {} - -public: - TrainingResult train(const TrainingRequest &TR) { - if (!D) - return TrainingResult::NullAcquiredDimension; - - return D->trainModel(TR); - } - - ~AcquiredDimension() { - if (AcqRD) - rrddim_acquired_release(AcqRD); - } - -private: - RRDDIM_ACQUIRED *AcqRD; - Dimension *D; -}; - -void Host::scheduleForTraining(TrainingRequest TR) { - TrainingQueue.push(TR); -} - -#define WORKER_JOB_TRAINING_FIND 0 -#define WORKER_JOB_TRAINING_TRAIN 1 -#define WORKER_JOB_TRAINING_STATS 2 - -void Host::train() { - worker_register("MLTRAIN"); - worker_register_job_name(WORKER_JOB_TRAINING_FIND, "find"); - worker_register_job_name(WORKER_JOB_TRAINING_TRAIN, "train"); - worker_register_job_name(WORKER_JOB_TRAINING_STATS, "stats"); - - service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, (force_quit_t )ml_cancel_anomaly_detection_threads, RH, true); - - while (service_running(SERVICE_ML_TRAINING)) { - auto P = TrainingQueue.pop(); - TrainingRequest TrainingReq = P.first; - size_t Size = P.second; - - if (ThreadsCancelled) { - info("Stopping training thread because it was cancelled."); - break; - } - - usec_t AllottedUT = (Cfg.TrainEvery * RH->rrd_update_every * USEC_PER_SEC) / Size; - if (AllottedUT > USEC_PER_SEC) - AllottedUT = USEC_PER_SEC; - - usec_t StartUT = now_monotonic_usec(); - TrainingResult TrainingRes; - { - worker_is_busy(WORKER_JOB_TRAINING_FIND); - AcquiredDimension AcqDim = AcquiredDimension::find(RH, TrainingReq.ChartId, TrainingReq.DimensionId); - - worker_is_busy(WORKER_JOB_TRAINING_TRAIN); - TrainingRes = AcqDim.train(TrainingReq); - - string_freez(TrainingReq.ChartId); - string_freez(TrainingReq.DimensionId); - } - usec_t ConsumedUT = now_monotonic_usec() - StartUT; - - worker_is_busy(WORKER_JOB_TRAINING_STATS); - - usec_t RemainingUT = 0; - if (ConsumedUT < AllottedUT) - RemainingUT = AllottedUT - ConsumedUT; - - { - std::lock_guard L(M); - - if (TS.AllottedUT == 0) { - struct rusage TRU; - getrusage(RUSAGE_THREAD, &TRU); - TS.TrainingRU = TRU; - } - - TS.QueueSize += Size; - TS.NumPoppedItems += 1; - - TS.AllottedUT += AllottedUT; - TS.ConsumedUT += ConsumedUT; - TS.RemainingUT += RemainingUT; - - switch (TrainingRes) { - case TrainingResult::Ok: - TS.TrainingResultOk += 1; - break; - case TrainingResult::InvalidQueryTimeRange: - TS.TrainingResultInvalidQueryTimeRange += 1; - break; - case TrainingResult::NotEnoughCollectedValues: - TS.TrainingResultNotEnoughCollectedValues += 1; - break; - case TrainingResult::NullAcquiredDimension: - TS.TrainingResultNullAcquiredDimension += 1; - break; - case TrainingResult::ChartUnderReplication: - TS.TrainingResultChartUnderReplicati