From 6850878e697d66dc90b9af1e750b22238c63c292 Mon Sep 17 00:00:00 2001 From: vkalintiris Date: Wed, 5 Oct 2022 10:11:12 +0300 Subject: Remove anomaly detector (#13657) * Move all dims under one class. * Dimension owns anomaly rate RD. * Remove Dimension::isAnomalous() * Remove Dimension::trainEvery() * Rm ml/kmeans * Remove anomaly detector The same logic can be implemented by using the host anomaly rate dim. * Profile plugin. * Revert "Profile plugin." This reverts commit e3db37cb49c514502c5216cfe7bca2a003fb90f1. * Add separate source files for anomaly detection charts. * Handle training/prediction sync at the dimension level. * Keep multiple KMeans models in mem. * Move feature extraction outside KMeans class. * Use multiple models. * Add /api/v1/ml_models endpoint. * Remove Dimension::getID() * Use just 1 model and fix tests. * Add detection logic based on rrdr. * Remove config options related to anomaly detection. * Make anomaly detection queries configurable. * Fix ad query duration option. * Finalize queries in all code paths. * Check if query was initialized before finalizing it * Do not leak OWA * Profile plugin. * Revert "Profile plugin." This reverts commit 5c77145d0df7e091d030476c480ab8d9cbceb89e. * Change context from anomaly_detection to detector_events. --- .gitmodules | 2 +- CMakeLists.txt | 20 +-- Makefile.am | 26 ++-- configure.ac | 6 +- ml/ADCharts.cc | 236 ++++++++++++++++++++++++++++++ ml/ADCharts.h | 23 +++ ml/BitBufferCounter.cc | 29 ---- ml/BitBufferCounter.h | 54 ------- ml/BitRateWindow.cc | 75 ---------- ml/BitRateWindow.h | 170 ---------------------- ml/Config.cc | 46 ++---- ml/Config.h | 11 +- ml/Database.cc | 127 ---------------- ml/Database.h | 131 ----------------- ml/Dimension.cc | 86 ++++++++--- ml/Dimension.h | 169 ++++++++-------------- ml/Host.cc | 352 ++++----------------------------------------- ml/Host.h | 30 ++-- ml/KMeans.cc | 43 ++++++ ml/KMeans.h | 41 ++++++ ml/Makefile.am | 8 -- ml/Query.h | 13 +- ml/SamplesBuffer.cc | 150 +++++++++++++++++++ ml/SamplesBuffer.h | 146 +++++++++++++++++++ ml/SamplesBufferTests.cc | 146 +++++++++++++++++++ ml/Tests.cc | 301 -------------------------------------- ml/dlib | 1 + ml/kmeans/KMeans.cc | 55 ------- ml/kmeans/KMeans.h | 34 ----- ml/kmeans/Makefile.am | 4 - ml/kmeans/SamplesBuffer.cc | 150 ------------------- ml/kmeans/SamplesBuffer.h | 146 ------------------- ml/kmeans/Tests.cc | 143 ------------------ ml/kmeans/dlib | 1 - ml/ml-dummy.c | 24 ++-- ml/ml-private.h | 2 +- ml/ml.cc | 61 ++------ ml/ml.h | 7 +- web/api/web_api_v1.c | 88 ++---------- 39 files changed, 1032 insertions(+), 2125 deletions(-) create mode 100644 ml/ADCharts.cc create mode 100644 ml/ADCharts.h delete mode 100644 ml/BitBufferCounter.cc delete mode 100644 ml/BitBufferCounter.h delete mode 100644 ml/BitRateWindow.cc delete mode 100644 ml/BitRateWindow.h delete mode 100644 ml/Database.cc delete mode 100644 ml/Database.h create mode 100644 ml/KMeans.cc create mode 100644 ml/KMeans.h delete mode 100644 ml/Makefile.am create mode 100644 ml/SamplesBuffer.cc create mode 100644 ml/SamplesBuffer.h create mode 100644 ml/SamplesBufferTests.cc delete mode 100644 ml/Tests.cc create mode 160000 ml/dlib delete mode 100644 ml/kmeans/KMeans.cc delete mode 100644 ml/kmeans/KMeans.h delete mode 100644 ml/kmeans/Makefile.am delete mode 100644 ml/kmeans/SamplesBuffer.cc delete mode 100644 ml/kmeans/SamplesBuffer.h delete mode 100644 ml/kmeans/Tests.cc delete mode 160000 ml/kmeans/dlib diff --git a/.gitmodules b/.gitmodules index 0f01d3d680..d3c7ace405 100644 --- a/.gitmodules +++ b/.gitmodules @@ -5,7 +5,7 @@ path = aclk/aclk-schemas url = https://github.com/netdata/aclk-schemas.git [submodule "ml/kmeans/dlib"] - path = ml/kmeans/dlib + path = ml/dlib url = https://github.com/davisking/dlib.git shallow = true ignore = dirty diff --git a/CMakeLists.txt b/CMakeLists.txt index fc7e47fbd0..e0c114ab4b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -348,11 +348,11 @@ ENDIF() # Detect ml dependencies file(STRINGS "${CMAKE_SOURCE_DIR}/config.h" DEFINE_ENABLE_ML REGEX "^#define ENABLE_ML 1$") IF(DEFINE_ENABLE_ML MATCHES ".+" AND - EXISTS "${CMAKE_SOURCE_DIR}/ml/kmeans/dlib/dlib/all/source.cpp" AND + EXISTS "${CMAKE_SOURCE_DIR}/ml/dlib/dlib/all/source.cpp" AND EXISTS "${CMAKE_SOURCE_DIR}/ml/json/single_include/nlohmann/json.hpp") set(ENABLE_ML True) list(APPEND NETDATA_COMMON_CFLAGS "-DDLIB_NO_GUI_SUPPORT") - list(APPEND NETDATA_COMMON_INCLUDE_DIRS "ml/kmeans/dlib") + list(APPEND NETDATA_COMMON_INCLUDE_DIRS "ml/dlib") ELSE() set(ENABLE_ML False) ENDIF() @@ -1025,24 +1025,18 @@ set(ML_FILES IF(ENABLE_ML) list(APPEND ML_FILES - ml/BitBufferCounter.h - ml/BitBufferCounter.cc - ml/BitRateWindow.h - ml/BitRateWindow.cc ml/Config.h ml/Config.cc - ml/Database.h - ml/Database.cc ml/Dimension.cc ml/Dimension.h ml/Host.h ml/Host.cc ml/Query.h - ml/kmeans/KMeans.h - ml/kmeans/KMeans.cc - ml/kmeans/SamplesBuffer.h - ml/kmeans/SamplesBuffer.cc - ml/kmeans/dlib/dlib/all/source.cpp + ml/KMeans.h + ml/KMeans.cc + ml/SamplesBuffer.h + ml/SamplesBuffer.cc + ml/dlib/dlib/all/source.cpp ml/json/single_include/nlohmann/json.hpp ml/ml.cc ml/ml-private.h diff --git a/Makefile.am b/Makefile.am index c5e3e0949d..34026e9888 100644 --- a/Makefile.am +++ b/Makefile.am @@ -38,7 +38,7 @@ EXTRA_DIST = \ build/m4/ax_c_mallopt.m4 \ build/m4/tcmalloc.m4 \ build/m4/ax_c__generic.m4 \ - ml/kmeans/dlib \ + ml/dlib \ README.md \ LICENSE \ REDISTRIBUTED.md \ @@ -109,7 +109,6 @@ SUBDIRS += \ claim \ parser \ spawn \ - ml \ $(NULL) AM_CFLAGS = \ @@ -237,39 +236,34 @@ ML_FILES = \ if ENABLE_ML ML_FILES += \ - ml/BitBufferCounter.h \ - ml/BitBufferCounter.cc \ - ml/BitRateWindow.h \ - ml/BitRateWindow.cc \ + ml/ADCharts.h \ + ml/ADCharts.cc \ ml/Config.h \ ml/Config.cc \ - ml/Database.h \ - ml/Database.cc \ ml/Dimension.cc \ ml/Dimension.h \ ml/Host.h \ ml/Host.cc \ ml/Query.h \ - ml/kmeans/KMeans.h \ - ml/kmeans/KMeans.cc \ - ml/kmeans/SamplesBuffer.h \ - ml/kmeans/SamplesBuffer.cc \ - ml/kmeans/dlib/dlib/all/source.cpp \ + ml/KMeans.h \ + ml/KMeans.cc \ + ml/SamplesBuffer.h \ + ml/SamplesBuffer.cc \ + ml/dlib/dlib/all/source.cpp \ ml/json/single_include/nlohmann/json.hpp \ ml/ml.cc \ ml/ml-private.h \ $(NULL) # Disable warnings from dlib library -ml/kmeans/dlib/dlib/all/source.$(OBJEXT) : CXXFLAGS += -Wno-sign-compare -Wno-type-limits -Wno-aggressive-loop-optimizations -Wno-stringop-overflow +ml/dlib/dlib/all/source.$(OBJEXT) : CXXFLAGS += -Wno-sign-compare -Wno-type-limits -Wno-aggressive-loop-optimizations -Wno-stringop-overflow endif if ENABLE_ML_TESTS ML_TESTS_FILES = \ - ml/kmeans/Tests.cc \ - ml/Tests.cc \ + ml/SamplesBufferTests.cc \ $(NULL) endif diff --git a/configure.ac b/configure.ac index d9cf075138..ef060a91f8 100644 --- a/configure.ac +++ b/configure.ac @@ -1109,7 +1109,7 @@ fi # Check if submodules have not been fetched. Fail if ML was explicitly requested. AC_MSG_CHECKING([if git submodules are present for machine learning functionality]) -if test -f "ml/kmeans/dlib/dlib/all/source.cpp" -a -f "ml/json/single_include/nlohmann/json.hpp"; then +if test -f "ml/dlib/dlib/all/source.cpp" -a -f "ml/json/single_include/nlohmann/json.hpp"; then AC_MSG_RESULT([yes]) have_ml_submodules="yes" else @@ -1149,7 +1149,7 @@ fi AM_CONDITIONAL([ENABLE_ML], [test "${build_ml}" = "yes"]) if test "${build_ml}" = "yes"; then AC_DEFINE([ENABLE_ML], [1], [anomaly detection usability]) - OPTIONAL_ML_CFLAGS="-DDLIB_NO_GUI_SUPPORT -I \$(abs_top_srcdir)/ml/kmeans/dlib" + OPTIONAL_ML_CFLAGS="-DDLIB_NO_GUI_SUPPORT -I \$(abs_top_srcdir)/ml/dlib" OPTIONAL_ML_LIBS="" fi @@ -1695,8 +1695,6 @@ AC_CONFIG_FILES([ exporting/tests/Makefile health/Makefile health/notifications/Makefile - ml/Makefile - ml/kmeans/Makefile libnetdata/Makefile libnetdata/tests/Makefile libnetdata/adaptive_resortable_list/Makefile diff --git a/ml/ADCharts.cc b/ml/ADCharts.cc new file mode 100644 index 0000000000..5a78b97918 --- /dev/null +++ b/ml/ADCharts.cc @@ -0,0 +1,236 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "ADCharts.h" +#include "Config.h" + +void ml::updateDimensionsChart(RRDHOST *RH, + collected_number NumTrainedDimensions, + collected_number NumNormalDimensions, + collected_number NumAnomalousDimensions) { + static thread_local RRDSET *RS = nullptr; + static thread_local RRDDIM *NumTotalDimensionsRD = nullptr; + static thread_local RRDDIM *NumTrainedDimensionsRD = nullptr; + static thread_local RRDDIM *NumNormalDimensionsRD = nullptr; + static thread_local RRDDIM *NumAnomalousDimensionsRD = nullptr; + + if (!RS) { + std::stringstream IdSS, NameSS; + + IdSS << "dimensions_on_" << localhost->machine_guid; + NameSS << "dimensions_on_" << localhost->hostname; + + RS = rrdset_create( + RH, + "anomaly_detection", // type + IdSS.str().c_str(), // id + NameSS.str().c_str(), // name + "dimensions", // family + "anomaly_detection.dimensions", // ctx + "Anomaly detection dimensions", // title + "dimensions", // units + "netdata", // plugin + "ml", // module + 39183, // priority + RH->rrd_update_every, // update_every + RRDSET_TYPE_LINE // chart_type + ); + rrdset_flag_set(RS, RRDSET_FLAG_ANOMALY_DETECTION); + + NumTotalDimensionsRD = rrddim_add(RS, "total", NULL, + 1, 1, RRD_ALGORITHM_ABSOLUTE); + NumTrainedDimensionsRD = rrddim_add(RS, "trained", NULL, + 1, 1, RRD_ALGORITHM_ABSOLUTE); + NumNormalDimensionsRD = rrddim_add(RS, "normal", NULL, + 1, 1, RRD_ALGORITHM_ABSOLUTE); + NumAnomalousDimensionsRD = rrddim_add(RS, "anomalous", NULL, + 1, 1, RRD_ALGORITHM_ABSOLUTE); + } else + rrdset_next(RS); + + rrddim_set_by_pointer(RS, NumTotalDimensionsRD, NumNormalDimensions + NumAnomalousDimensions); + rrddim_set_by_pointer(RS, NumTrainedDimensionsRD, NumTrainedDimensions); + rrddim_set_by_pointer(RS, NumNormalDimensionsRD, NumNormalDimensions); + rrddim_set_by_pointer(RS, NumAnomalousDimensionsRD, NumAnomalousDimensions); + + rrdset_done(RS); +} + +void ml::updateHostAndDetectionRateCharts(RRDHOST *RH, collected_number AnomalyRate) { + static thread_local RRDSET *HostRateRS = nullptr; + static thread_local RRDDIM *AnomalyRateRD = nullptr; + + if (!HostRateRS) { + std::stringstream IdSS, NameSS; + + IdSS << "anomaly_rate_on_" << localhost->machine_guid; + NameSS << "anomaly_rate_on_" << localhost->hostname; + + HostRateRS = rrdset_create( + RH, + "anomaly_detection", // type + IdSS.str().c_str(), // id + NameSS.str().c_str(), // name + "anomaly_rate", // family + "anomaly_detection.anomaly_rate", // ctx + "Percentage of anomalous dimensions", // title + "percentage", // units + "netdata", // plugin + "ml", // module + 39184, // priority + RH->rrd_update_every, // update_every + RRDSET_TYPE_LINE // chart_type + ); + rrdset_flag_set(HostRateRS, RRDSET_FLAG_ANOMALY_DETECTION); + + AnomalyRateRD = rrddim_add(HostRateRS, "anomaly_rate", NULL, + 1, 100, RRD_ALGORITHM_ABSOLUTE); + } else + rrdset_next(HostRateRS); + + rrddim_set_by_pointer(HostRateRS, AnomalyRateRD, AnomalyRate); + rrdset_done(HostRateRS); + + static thread_local RRDSET *AnomalyDetectionRS = nullptr; + static thread_local RRDDIM *AboveThresholdRD = nullptr; + static thread_local RRDDIM *NewAnomalyEventRD = nullptr; + + if (!AnomalyDetectionRS) { + std::stringstream IdSS, NameSS; + + IdSS << "anomaly_detection_on_" << localhost->machine_guid; + NameSS << "anomaly_detection_on_" << localhost->hostname; + + AnomalyDetectionRS = rrdset_create( + RH, + "anomaly_detection", // type + IdSS.str().c_str(), // id + NameSS.str().c_str(), // name + "anomaly_detection", // family + "anomaly_detection.detector_events", // ctx + "Anomaly detection events", // title + "percentage", // units + "netdata", // plugin + "ml", // module + 39185, // priority + RH->rrd_update_every, // update_every + RRDSET_TYPE_LINE // chart_type + ); + rrdset_flag_set(AnomalyDetectionRS, RRDSET_FLAG_ANOMALY_DETECTION); + + AboveThresholdRD = rrddim_add(AnomalyDetectionRS, "above_threshold", NULL, + 1, 1, RRD_ALGORITHM_ABSOLUTE); + NewAnomalyEventRD = rrddim_add(AnomalyDetectionRS, "new_anomaly_event", NULL, + 1, 1, RRD_ALGORITHM_ABSOLUTE); + } else + rrdset_next(AnomalyDetectionRS); + + /* + * Compute the values of the dimensions based on the host rate chart + */ + ONEWAYALLOC *OWA = onewayalloc_create(0); + time_t Now = now_realtime_sec(); + time_t Before = Now - RH->rrd_update_every; + time_t After = Before - Cfg.AnomalyDetectionQueryDuration; + RRDR_OPTIONS Options = static_cast(0x00000000); + + RRDR *R = rrd2rrdr( + OWA, HostRateRS, + 1 /* points wanted */, + After, + Before, + Cfg.AnomalyDetectionGroupingMethod, + 0 /* resampling time */, + Options, "anomaly_rate", + NULL /* context param list */, + NULL /* group options */, + 0, /* timeout */ + 0 /* tier */ + ); + assert(R->d == 1 && R->n == 1 && R->rows == 1); + + static thread_local bool PrevAboveThreshold = false; + bool AboveThreshold = R->v[0] >= Cfg.HostAnomalyRateThreshold; + bool NewAnomalyEvent = AboveThreshold && !PrevAboveThreshold; + PrevAboveThreshold = AboveThreshold; + + rrddim_set_by_pointer(AnomalyDetectionRS, AboveThresholdRD, AboveThreshold); + rrddim_set_by_pointer(AnomalyDetectionRS, NewAnomalyEventRD, NewAnomalyEvent); + rrdset_done(AnomalyDetectionRS); + + rrdr_free(OWA, R); + onewayalloc_destroy(OWA); +} + +void ml::updateDetectionChart(RRDHOST *RH) { + static thread_local RRDSET *RS = nullptr; + static thread_local RRDDIM *UserRD, *SystemRD = nullptr; + + if (!RS) { + std::stringstream IdSS, NameSS; + + IdSS << "prediction_stats_" << RH->machine_guid; + NameSS << "prediction_stats_for_" << RH->hostname; + + RS = rrdset_create_localhost( + "netdata", // type + IdSS.str().c_str(), // id + NameSS.str().c_str(), // name + "ml", // family + "netdata.prediction_stats", // ctx + "Prediction thread CPU usage", // title + "milliseconds/s", // units + "netdata", // plugin + "ml", // module + 136000, // priority + RH->rrd_update_every, // update_every + RRDSET_TYPE_STACKED // chart_type + ); + + UserRD = rrddim_add(RS, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); + SystemRD = rrddim_add(RS, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); + } else + rrdset_next(RS); + + struct rusage TRU; + getrusage(RUSAGE_THREAD, &TRU); + + rrddim_set_by_pointer(RS, UserRD, TRU.ru_utime.tv_sec * 1000000ULL + TRU.ru_utime.tv_usec); + rrddim_set_by_pointer(RS, SystemRD, TRU.ru_stime.tv_sec * 1000000ULL + TRU.ru_stime.tv_usec); + rrdset_done(RS); +} + +void ml::updateTrainingChart(RRDHOST *RH, struct rusage *TRU) { + static thread_local RRDSET *RS = nullptr; + static thread_local RRDDIM *UserRD = nullptr; + static thread_local RRDDIM *SystemRD = nullptr; + + if (!RS) { + std::stringstream IdSS, NameSS; + + IdSS << "training_stats_" << RH->machine_guid; + NameSS << "training_stats_for_" << RH->hostname; + + RS = rrdset_create_localhost( + "netdata", // type + IdSS.str().c_str(), // id + NameSS.str().c_str(), // name + "ml", // family + "netdata.training_stats", // ctx + "Training thread CPU usage", // title + "milliseconds/s", // units + "netdata", // plugin + "ml", // module + 136001, // priority + RH->rrd_update_every, // update_every + RRDSET_TYPE_STACKED // chart_type + ); + + UserRD = rrddim_add(RS, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); + SystemRD = rrddim_add(RS, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); + } else + rrdset_next(RS); + + rrddim_set_by_pointer(RS, UserRD, TRU->ru_utime.tv_sec * 1000000ULL + TRU->ru_utime.tv_usec); + rrddim_set_by_pointer(RS, SystemRD, TRU->ru_stime.tv_sec * 1000000ULL + TRU->ru_stime.tv_usec); + rrdset_done(RS); +} diff --git a/ml/ADCharts.h b/ml/ADCharts.h new file mode 100644 index 0000000000..0be324f7d7 --- /dev/null +++ b/ml/ADCharts.h @@ -0,0 +1,23 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef ML_ADCHARTS_H +#define ML_ADCHARTS_H + +#include "ml-private.h" + +namespace ml { + +void updateDimensionsChart(RRDHOST *RH, + collected_number NumTrainedDimensions, + collected_number NumNormalDimensions, + collected_number NumAnomalousDimensions); + +void updateHostAndDetectionRateCharts(RRDHOST *RH, collected_number AnomalyRate); + +void updateDetectionChart(RRDHOST *RH); + +void updateTrainingChart(RRDHOST *RH, struct rusage *TRU); + +} // namespace ml + +#endif /* ML_ADCHARTS_H */ diff --git a/ml/BitBufferCounter.cc b/ml/BitBufferCounter.cc deleted file mode 100644 index 5e1ab5aca3..0000000000 --- a/ml/BitBufferCounter.cc +++ /dev/null @@ -1,29 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#include "BitBufferCounter.h" - -using namespace ml; - -std::vector BitBufferCounter::getBuffer() const { - std::vector Buffer; - - for (size_t Idx = start(); Idx != (start() + size()); Idx++) - Buffer.push_back(V[Idx % V.size()]); - - return Buffer; -} - -void BitBufferCounter::insert(bool Bit) { - if (N >= V.size()) - NumSetBits -= (V[start()] == true); - - NumSetBits += (Bit == true); - V[N++ % V.size()] = Bit; -} - -void BitBufferCounter::print(std::ostream &OS) const { - std::vector Buffer = getBuffer(); - - for (bool B : Buffer) - OS << B; -} diff --git a/ml/BitBufferCounter.h b/ml/BitBufferCounter.h deleted file mode 100644 index db924d7762..0000000000 --- a/ml/BitBufferCounter.h +++ /dev/null @@ -1,54 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#ifndef BIT_BUFFER_COUNTER_H -#define BIT_BUFFER_COUNTER_H - -#include "ml-private.h" - -namespace ml { - -class BitBufferCounter { -public: - BitBufferCounter(size_t Capacity) : V(Capacity, 0), NumSetBits(0), N(0) {} - - std::vector getBuffer() const; - - void insert(bool Bit); - - void print(std::ostream &OS) const; - - bool isFilled() const { - return N >= V.size(); - } - - size_t numSetBits() const { - return NumSetBits; - } - -private: - inline size_t size() const { - return N < V.size() ? N : V.size(); - } - - inline size_t start() const { - if (N <= V.size()) - return 0; - - return N % V.size(); - } - -private: - std::vector V; - size_t NumSetBits; - - size_t N; -}; - -} // namespace ml - -inline std::ostream& operator<<(std::ostream &OS, const ml::BitBufferCounter &BBC) { - BBC.print(OS); - return OS; -} - -#endif /* BIT_BUFFER_COUNTER_H */ diff --git a/ml/BitRateWindow.cc b/ml/BitRateWindow.cc deleted file mode 100644 index c4c994c42d..0000000000 --- a/ml/BitRateWindow.cc +++ /dev/null @@ -1,75 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#include "BitRateWindow.h" - -using namespace ml; - -std::pair BitRateWindow::insert(bool Bit) { - Edge E; - - BBC.insert(Bit); - switch (CurrState) { - case State::NotFilled: { - if (BBC.isFilled()) { - if (BBC.numSetBits() < SetBitsThreshold) { - CurrState = State::BelowThreshold; - } else { - CurrState = State::AboveThreshold; - } - } else { - CurrState = State::NotFilled; - } - - E = {State::NotFilled, CurrState}; - break; - } case State::BelowThreshold: { - if (BBC.numSetBits() >= SetBitsThreshold) { - CurrState = State::AboveThreshold; - } - - E = {State::BelowThreshold, CurrState}; - break; - } case State::AboveThreshold: { - if ((BBC.numSetBits() < SetBitsThreshold) || - (CurrLength == MaxLength)) { - CurrState = State::Idle; - } - - E = {State::AboveThreshold, CurrState}; - break; - } case State::Idle: { - if (CurrLength == IdleLength) { - CurrState = State::NotFilled; - } - - E = {State::Idle, CurrState}; - break; - } - } - - Action A = EdgeActions[E]; - size_t L = (this->*A)(E.first, Bit); - return {E, L}; -} - -void BitRateWindow::print(std::ostream &OS) const { - switch (CurrState) { - case State::NotFilled: - OS << "NotFilled"; - break; - case State::BelowThreshold: - OS << "BelowThreshold"; - break; - case State::AboveThreshold: - OS << "AboveThreshold"; - break; - case State::Idle: - OS << "Idle"; - break; - default: - OS << "UnknownState"; - break; - } - - OS << ": " << BBC << " (Current Length: " << CurrLength << ")"; -} diff --git a/ml/BitRateWindow.h b/ml/BitRateWindow.h deleted file mode 100644 index 0d99008b85..0000000000 --- a/ml/BitRateWindow.h +++ /dev/null @@ -1,170 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#ifndef BIT_RATE_WINDOW_H -#define BIT_RATE_WINDOW_H - -#include "BitBufferCounter.h" -#include "ml-private.h" - -namespace ml { - -class BitRateWindow { -public: - enum class State { - NotFilled, - BelowThreshold, - AboveThreshold, - Idle - }; - - using Edge = std::pair; - using Action = size_t (BitRateWindow::*)(State PrevState, bool NewBit); - -private: - std::map EdgeActions = { - // From == To - { - Edge(State::NotFilled, State::NotFilled), - &BitRateWindow::onRoundtripNotFilled, - }, - { - Edge(State::BelowThreshold, State::BelowThreshold), - &BitRateWindow::onRoundtripBelowThreshold, - }, - { - Edge(State::AboveThreshold, State::AboveThreshold), - &BitRateWindow::onRoundtripAboveThreshold, - }, - { - Edge(State::Idle, State::Idle), - &BitRateWindow::onRoundtripIdle, - }, - - - // NotFilled => {BelowThreshold, AboveThreshold} - { - Edge(State::NotFilled, State::BelowThreshold), - &BitRateWindow::onNotFilledToBelowThreshold - }, - { - Edge(State::NotFilled, State::AboveThreshold), - &BitRateWindow::onNotFilledToAboveThreshold - }, - - // BelowThreshold => AboveThreshold - { - Edge(State::BelowThreshold, State::AboveThreshold), - &BitRateWindow::onBelowToAboveThreshold - }, - - // AboveThreshold => Idle - { - Edge(State::AboveThreshold, State::Idle), - &BitRateWindow::onAboveThresholdToIdle - }, - - // Idle => NotFilled - { - Edge(State::Idle, State::NotFilled), - &BitRateWindow::onIdleToNotFilled - }, - }; - -public: - BitRateWindow(size_t MinLength, size_t MaxLength, size_t IdleLength, - size_t SetBitsThreshold) : - MinLength(MinLength), MaxLength(MaxLength), IdleLength(IdleLength), - SetBitsThreshold(SetBitsThreshold), - CurrState(State::NotFilled), CurrLength(0), BBC(MinLength) {} - - std::pair insert(bool Bit); - - void print(std::ostream &OS) const; - -private: - size_t onRoundtripNotFilled(State PrevState, bool NewBit) { - (void) PrevState, (void) NewBit; - - CurrLength += 1; - return CurrLength; - } - - size_t onRoundtripBelowThreshold(State PrevState, bool NewBit) { - (void) PrevState, (void) NewBit; - - CurrLength = MinLength; - return CurrLength; - } - - size_t onRoundtripAboveThreshold(State PrevState, bool NewBit) { - (void) PrevState, (void) NewBit; - - CurrLength += 1; - return CurrLength; - } - - size_t onRoundtripIdle(State PrevState, bool NewBit) { - (void) PrevState, (void) NewBit; - - CurrLength += 1; - return CurrLength; - } - - size_t onNotFilledToBelowThreshold(State PrevState, bool NewBit) { - (void) PrevState, (void) NewBit; - - CurrLength = MinLength; - return CurrLength; - } - - size_t onNotFilledToAboveThreshold(State PrevState, bool NewBit) { - (void) PrevState, (void) NewBit; - - CurrLength += 1; - return CurrLength; - } - - size_t onBelowToAboveThreshold(State PrevState, bool NewBit) { - (void) PrevState, (void) NewBit; - - CurrLength = MinLength; - return CurrLength; - } - - size_t onAboveThresholdToIdle(State PrevState, bool NewBit) { - (void) PrevState, (void) NewBit; - - size_t PrevLength = CurrLength; - CurrLength = 1; - return PrevLength; - } - - size_t onIdleToNotFilled(State PrevState, bool NewBit) { - (void) PrevState, (void) NewBit; - - BBC = BitBufferCounter(MinLength); - BBC.insert(NewBit); - - CurrLength = 1; - return CurrLength; - } - -private: - size_t MinLength; - size_t MaxLength; - size_t IdleLength; - size_t SetBitsThreshold; - - State CurrState; - size_t CurrLength; - BitBufferCounter BBC; -}; - -} // namespace ml - -inline std::ostream& operator<<(std::ostream &OS, const ml::BitRateWindow BRW) { - BRW.print(OS); - return OS; -} - -#endif /* BIT_RATE_WINDOW_H */ diff --git a/ml/Config.cc b/ml/Config.cc index 65b05a34d6..63b570156a 100644 --- a/ml/Config.cc +++ b/ml/Config.cc @@ -31,6 +31,7 @@ void Config::readMLConfig(void) { unsigned MaxTrainSamples = config_get_number(ConfigSectionML, "maximum num samples to train", 4 * 3600); unsigned MinTrainSamples = config_get_number(ConfigSectionML, "minimum num samples to train", 1 * 900); unsigned TrainEvery = config_get_number(ConfigSectionML, "train every", 1 * 3600); + unsigned NumModelsToUse = config_get_number(ConfigSectionML, "number of models per dimension", 1 * 24); unsigned DBEngineAnomalyRateEvery = config_get_number(ConfigSectionML, "dbengine anomaly rate every", 30); @@ -42,25 +43,19 @@ void Config::readMLConfig(void) { unsigned MaxKMeansIters = config_get_number(ConfigSectionML, "maximum number of k-means iterations", 1000); double DimensionAnomalyScoreThreshold = config_get_float(ConfigSectionML, "dimension anomaly score threshold", 0.99); - double HostAnomalyRateThreshold = config_get_float(ConfigSectionML, "host anomaly rate threshold", 0.01); - double ADMinWindowSize = config_get_float(ConfigSectionML, "minimum window size", 30); - double ADMaxWindowSize = config_get_float(ConfigSectionML, "maximum window size", 600); - double ADIdleWindowSize = config_get_float(ConfigSectionML, "idle window size", 30); - double ADWindowRateThreshold = config_get_float(ConfigSectionML, "window minimum anomaly rate", 0.25); - double ADDimensionRateThreshold = config_get_float(ConfigSectionML, "anomaly event min dimension rate threshold", 0.05); - - std::stringstream SS; - SS << netdata_configured_cache_dir << "/anomaly-detection.db"; - Cfg.AnomalyDBPath = SS.str(); + double HostAnomalyRateThreshold = config_get_float(ConfigSectionML, "host anomaly rate threshold", 1.0); + std::string AnomalyDetectionGroupingMethod = config_get(ConfigSectionML, "anomaly detection grouping method", "average"); + time_t AnomalyDetectionQueryDuration = config_get_number(ConfigSectionML, "anomaly detection grouping duration", 5 * 60); /* * Clamp */ - MaxTrainSamples = clamp(MaxTrainSamples, 1 * 3600u, 24 * 3600u); - MinTrainSamples = clamp(MinTrainSamples, 1 * 900u, 6 * 3600u); - TrainEvery = clamp(TrainEvery, 1 * 3600u, 6 * 3600u); + MaxTrainSamples = clamp(MaxTrainSamples, 1 * 3600, 24 * 3600); + MinTrainSamples = clamp(MinTrainSamples, 1 * 900, 6 * 3600); + TrainEvery = clamp(TrainEvery, 1 * 3600, 6 * 3600); + NumModelsToUse = clamp(TrainEvery, 1, 7 * 24); DBEngineAnomalyRateEvery = clamp(DBEngineAnomalyRateEvery, 1 * 30u, 15 * 60u); @@ -72,13 +67,9 @@ void Config::readMLConfig(void) { MaxKMeansIters = clamp(MaxKMeansIters, 500u, 1000u); DimensionAnomalyScoreThreshold = clamp(DimensionAnomalyScoreThreshold, 0.01, 5.00); - HostAnomalyRateThreshold = clamp(HostAnomalyRateThreshold, 0.01, 1.0); - ADMinWindowSize = clamp(ADMinWindowSize, 30.0, 300.0); - ADMaxWindowSize = clamp(ADMaxWindowSize, 60.0, 900.0); - ADIdleWindowSize = clamp(ADIdleWindowSize, 30.0, 900.0); - ADWindowRateThreshold = clamp(ADWindowRateThreshold, 0.01, 0.99); - ADDimensionRateThreshold = clamp(ADDimensionRateThreshold, 0.01, 0.99); + HostAnomalyRateThreshold = clamp(HostAnomalyRateThreshold, 0.1, 10.0); + AnomalyDetectionQueryDuration = clamp(AnomalyDetectionQueryDuration, 60, 15 * 60); /* * Validate @@ -91,13 +82,6 @@ void Config::readMLConfig(void) { MaxTrainSamples = 4 * 3600; } - if (ADMinWindowSize >= ADMaxWindowSize) { - error("invalid min/max anomaly window size found (%lf >= %lf)", ADMinWindowSize, ADMaxWindowSize); - - ADMinWindowSize = 30.0; - ADMaxWindowSize = 600.0; - } - /* * Assign to config instance */ @@ -107,6 +91,7 @@ void Config::readMLConfig(void) { Cfg.MaxTrainSamples = MaxTrainSamples; Cfg.MinTrainSamples = MinTrainSamples; Cfg.TrainEvery = TrainEvery; + Cfg.NumModelsToUse = NumModelsToUse; Cfg.DBEngineAnomalyRateEvery = DBEngineAnomalyRateEvery; @@ -118,13 +103,10 @@ void Config::readMLConfig(void) { Cfg.MaxKMeansIters = MaxKMeansIters; Cfg.DimensionAnomalyScoreThreshold = DimensionAnomalyScoreThreshold; - Cfg.HostAnomalyRateThreshold = HostAnomalyRateThreshold; - Cfg.ADMinWindowSize = ADMinWindowSize; - Cfg.ADMaxWindowSize = ADMaxWindowSize; - Cfg.ADIdleWindowSize = ADIdleWindowSize; - Cfg.ADWindowRateThreshold = ADWindowRateThreshold; - Cfg.ADDimensionRateThreshold = ADDimensionRateThreshold; + Cfg.HostAnomalyRateThreshold = HostAnomalyRateThreshold; + Cfg.AnomalyDetectionGroupingMethod = web_client_api_request_v1_data_group(AnomalyDetectionGroupingMethod.c_str(), RRDR_GROUPING_AVERAGE); + Cfg.AnomalyDetectionQueryDuration = AnomalyDetectionQueryDuration; Cfg.HostsToSkip = config_get(ConfigSectionML, "hosts to skip from training", "!*"); Cfg.SP_HostsToSkip = simple_pattern_create(Cfg.HostsToSkip.c_str(), NULL, SIMPLE_PATTERN_EXACT); diff --git a/ml/Config.h b/ml/Config.h index 595fd072bb..d876d4aa41 100644 --- a/ml/Config.h +++ b/ml/Config.h @@ -14,6 +14,7 @@ public: unsigned MaxTrainSamples; unsigned MinTrainSamples; unsigned TrainEvery; + unsigned NumModelsToUse; unsigned DBEngineAnomalyRateEvery; @@ -25,13 +26,10 @@ public: unsigned MaxKMeansIters; double DimensionAnomalyScoreThreshold; - double HostAnomalyRateThreshold; - double ADMinWindowSize; - double ADMaxWindowSize; - double ADIdleWindowSize; - double ADWindowRateThreshold; - double ADDimensionRateThreshold; + double HostAnomalyRateThreshold; + RRDR_GROUPING AnomalyDetectionGroupingMethod; + time_t AnomalyDetectionQueryDuration; bool StreamADCharts; @@ -41,7 +39,6 @@ public: std::string ChartsToSkip; SIMPLE_PATTERN *SP_ChartsToSkip; - std::string AnomalyDBPath; std::vector RandomNums; void readMLConfig(); diff --git a/ml/Database.cc b/ml/Database.cc deleted file mode 100644 index b46f627f6a..0000000000 --- a/ml/Database.cc +++ /dev/null @@ -1,127 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#include "Database.h" - -const char *ml::Database::SQL_CREATE_ANOMALIES_TABLE = - "CREATE TABLE IF NOT EXISTS anomaly_events( " - " anomaly_detector_name text NOT NULL, " - " anomaly_detector_version int NOT NULL, " - " host_id text NOT NULL, " - " after int NOT NULL, " - " before int NOT NULL, " - " anomaly_event_info text, " - " PRIMARY KEY( " - " anomaly_detector_name, anomaly_detector_version, " - " host_id, after, before " - " ) " - ");"; - -const char *ml::Database::SQL_INSERT_ANOMALY = - "INSERT INTO anomaly_events( " - " anomaly_detector_name, anomaly_detector_version, " - " host_id, after, before, anomaly_event_info) " - "VALUES (?1, ?2, ?3, ?4, ?5, ?6);"; - -const char *ml::Database::SQL_SELECT_ANOMALY = - "SELECT anomaly_event_info FROM anomaly_events WHERE" - " anomaly_detector_name == ?1 AND" - " anomaly_detector_version == ?2 AND" - " host_id == ?3 AND" - " after == ?4 AND" - " before == ?5;"; - -const char *ml::Database::SQL_SELECT_ANOMALY_EVENTS = - "SELECT after, before FROM anomaly_events WHERE" - " anomaly_detector_name == ?1 AND" - " anomaly_detector_version == ?2 AND" - " host_id == ?3 AND" - " after >= ?4 AND" - " before <= ?5;"; - -using namespace ml; - -bool Statement::prepare(sqlite3 *Conn) { - if (!Conn) - return false; - - if (ParsedStmt) - return true; - - int RC = sqlite3_prepare_v2(Conn, RawStmt, -1, &ParsedStmt, nullptr); - if (RC == SQLITE_OK) - return true; - - std::string Msg = "Statement \"%s\" preparation failed due to \"%s\""; - error(Msg.c_str(), RawStmt, sqlite3_errstr(RC)); - - return false; -} - -bool Statement::bindValue(size_t Pos, const std::string &Value) { - int RC = sqlite3_bind_text(ParsedStmt, Pos, Value.c_str(), -1, SQLITE_TRANSIENT); - if (RC == SQLITE_OK) - return true; - - error("Failed to bind text '%s' (pos = %zu) in statement '%s'.", Value.c_str(), Pos, RawStmt); - return false; -} - -bool Statement::bindValue(size_t Pos, const int Value) { - int RC = sqlite3_bind_int(ParsedStmt, Pos, Value); - if (RC == SQLITE_OK) - return true; - - error("Failed to bind integer %d (pos = %zu) in statement '%s'.", Value, Pos, RawStmt); - return false; -} - -bool Statement::resetAndClear(bool Ret) { - int RC = sqlite3_reset(ParsedStmt); - if (RC != SQLITE_OK) { - error("Could not reset statement: '%s'", RawStmt); - return false; - } - - RC = sqlite3_clear_bindings(ParsedStmt); - if (RC != SQLITE_OK) { - error("Could not clear bindings in statement: '%s'", RawStmt); - return false; - } - - return Ret; -} - -Database::Database(const std::string &Path) { - // Get sqlite3 connection handle. - int RC = sqlite3_open(Path.c_str(), &Conn); - if (RC != SQLITE_OK) { - std::string Msg = "Failed to initialize ML DB at %s, due to \"%s\""; - error(Msg.c_str(), Path.c_str(), sqlite3_errstr(RC)); - - sqlite3_close(Conn); - Conn = nullptr; - return; - } - - // Create anomaly events table if it does not exist. - char *ErrMsg; - RC = sqlite3_exec_monitored(Conn, SQL_CREATE_ANOMALIES_TABLE, nullptr, nullptr, &ErrMsg); - if (RC == SQLITE_OK) - return; - - error("SQLite error during database initialization, rc = %d (%s)", RC, ErrMsg); - error("SQLite failed statement: %s", SQL_CREATE_ANOMALIES_TABLE); - - sqlite3_free(ErrMsg); - sqlite3_close(Conn); - Conn = nullptr; -} - -Database::~Database() { - if (!Conn) - return; - - int RC = sqlite3_close(Conn); - if (RC != SQLITE_OK) - error("Could not close connection properly (rc=%d)", RC); -} diff --git a/ml/Database.h b/ml/Database.h deleted file mode 100644 index f14755d1bd..0000000000 --- a/ml/Database.h +++ /dev/null @@ -1,131 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#ifndef ML_DATABASE_H -#define ML_DATABASE_H - -#include "Dimension.h" -#include "ml-private.h" - -#include "json/single_include/nlohmann/json.hpp" - -namespace ml { - -class Statement { -public: - using RowCallback = std::function; - -public: - Statement(const char *RawStmt) : RawStmt(RawStmt), ParsedStmt(nullptr) {} - - template - bool exec(sqlite3 *Conn, RowCallback RowCb, ArgTypes ...Args) { - if (!prepare(Conn)) - return false; - - switch (bind(1, Args...)) { - case 0: - return false; - case sizeof...(Args): - break; - default: - return resetAndClear(false); - } - - while (true) { - switch (int RC = sqlite3_step_monitored(ParsedStmt)) { - case SQLITE_BUSY: case SQLITE_LOCKED: - usleep(SQLITE_INSERT_DELAY * USEC_PER_MS); - continue; - case SQLITE_ROW: - RowCb(ParsedStmt); - continue; - case SQLITE_DONE: - return resetAndClear(true); - default: - error("Stepping through '%s' returned rc=%d", RawStmt, RC); - return resetAndClear(false); - } - } - } - - ~Statement() { - if (!ParsedStmt) - return; - - int RC = sqlite3_finalize(ParsedStmt); - if (RC != SQLITE_OK) - error("Could not properly finalize statement (rc=%d)", RC); - } - -private: - bool prepare(sqlite3 *Conn); - - bool bindValue(size_t Pos, const int Value); - bool bindValue(size_t Pos, const std::string &Value); - - template - size_t bind(size_t Pos, ArgType T) { - return bindValue(Pos, T); - } - - template - size_t bind(size_t Pos, ArgType T, ArgTypes ...Args) { - return bindValue(Pos, T) + bind(Pos + 1, Args...); - } - - bool resetAndClear(bool Ret); - -private: - const char *RawStmt; - sqlite3_stmt *ParsedStmt; -}; - -class Database { -private: - static const char *SQL_CREATE_ANOMALIES_TABLE; - static const char *SQL_INSERT_ANOMALY; - static const char *SQL_SELECT_ANOMALY; - static const char *SQL_SELECT_ANOMALY_EVENTS; - -public: - Database(const std::string &Path); - - ~Database(); - - template - bool insertAnomaly(ArgTypes... Args) { - Statement::RowCallback RowCb = [](sqlite3_stmt *Stmt) { (void) Stmt; }; - return InsertAnomalyStmt.exec(Conn, RowCb, Args...); - } - - template - bool getAnomalyInfo(nlohmann::json &Json, ArgTypes&&... Args) { - Statement::RowCallback RowCb = [&](sqlite3_stmt *Stmt) { - const char *Text = static_cast(sqlite3_column_blob(Stmt, 0)); - Json = nlohmann::json::parse(Text); - }; - return GetAnomalyInfoStmt.exec(Conn, RowCb, Args...); - } - - template - bool getAnomaliesInRange(std::vector> &V, ArgTypes&&... Args) { - Statement::RowCallback RowCb = [&](sqlite3_stmt *Stmt) { - V.push_back({ - sqlite3_column_int64(Stmt, 0), - sqlite3_column_int64(Stmt, 1) - }); - }; - return GetAnomaliesInRangeStmt.exec(Conn, RowCb, Args...); - } - -private: - sqlite3 *Conn; - - Statement InsertAnomalyStmt{SQL_INSERT_ANOMALY}; - Statement GetAnomalyInfoStmt{SQL_SELECT_ANOMALY}; - Statement GetAnomaliesInRangeStmt{SQL_SELECT_ANOMALY_EVENTS}; -}; - -} - -#endif /* ML_DATABASE_H */ diff --git a/ml/Dimension.cc b/ml/Dimension.cc index 0fe07530c8..39e74a065d 100644 --- a/ml/Dimension.cc +++ b/ml/Dimension.cc @@ -6,8 +6,13 @@ using namespace ml; -std::pair -TrainableDimension::getCalculatedNumbers() { +bool Dimension::isActive() const { + bool SetObsolete = rrdset_flag_check(RD->rrdset, RRDSET_FLAG_OBSOLETE); + bool DimObsolete = rrddim_flag_check(RD, RRDDIM_FLAG_OBSOLETE); + return !SetObsolete && !DimObsolete; +} + +std::pair Dimension::getCalculatedNumbers() { size_t MinN = Cfg.MinTrainSamples; size_t MaxN = Cfg.MaxTrainSamples; @@ -68,7 +73,7 @@ TrainableDimension::getCalculatedNumbers() { return { CNs, TotalValues }; } -MLResult TrainableDimension::trainModel() { +MLResult Dimension::trainModel() { auto P = getCalculatedNumbers(); CalculatedNumber *CNs = P.first; unsigned N = P.second; @@ -81,7 +86,15 @@ MLResult TrainableDimension::trainModel() { SamplesBuffer SB = SamplesBuffer(CNs, N, 1, Cfg.DiffN, Cfg.SmoothN, Cfg.LagN, SamplingRatio, Cfg.RandomNums); - KM.train(SB, Cfg.MaxKMeansIters); + std::vector Samples = SB.preprocess(); + + KMeans KM; + KM.train(Samples, Cfg.MaxKMeansIters); + + { + std::lock_guard Lock(Mutex); + Models[0] = KM; + } Trained = true; ConstantModel = true; @@ -90,16 +103,25 @@ MLResult TrainableDimension::trainModel() { return MLResult::Success; } -void PredictableDimension::addValue(CalculatedNumber Value, bool Exists) { +bool Dimension::shouldTrain(const TimePoint &TP) const { + if (ConstantModel) + return false; + + return (LastTrainedAt + Seconds(Cfg.TrainEvery * updateEvery())) < TP; +} + +bool Dimension::predict(CalculatedNumber Value, bool Exists) { if (!Exists) { CNs.clear(); - return; + AnomalyBit = false; + return false; } unsigned N = Cfg.DiffN + Cfg.SmoothN + Cfg.LagN; if (CNs.size() < N) { CNs.push_back(Value); - return; + AnomalyBit = false; + return false; } std::rotate(std::begin(CNs), std::begin(CNs) + 1, std::end(CNs)); @@ -108,28 +130,54 @@ void PredictableDimension::addValue(CalculatedNumber Value, bool Exists) { ConstantModel = false; CNs[N - 1] = Value; -} -std::pair PredictableDimension::predict() { - unsigned N = Cfg.DiffN + Cfg.SmoothN + Cfg.LagN; - if (CNs.size() != N) { + if (!isTrained() || ConstantModel) { AnomalyBit = false; - return { MLResult::MissingData, AnomalyBit }; + return false; } CalculatedNumber *TmpCNs = new CalculatedNumber[N * (Cfg.LagN + 1)](); std::memcpy(TmpCNs, CNs.data(), N * sizeof(CalculatedNumber)); - - SamplesBuffer SB = SamplesBuffer(TmpCNs, N, 1, Cfg.DiffN, Cfg.SmoothN, Cfg.LagN, + SamplesBuffer SB = SamplesBuffer(TmpCNs, N, 1, + Cfg.DiffN, Cfg.SmoothN, Cfg.LagN, 1.0, Cfg.RandomNums); - AnomalyScore = computeAnomalyScore(SB); + const DSample Sample = SB.preprocess().back(); delete[] TmpCNs; - if (AnomalyScore == std::numeric_limits::quiet_NaN()) { + std::unique_lock Lock(Mutex, std::defer_lock); + if (!Lock.try_lock()) { AnomalyBit = false; - return { MLResult::NaN, AnomalyBit }; + return false; + } + + for (const auto &KM : Models) { + double AnomalyScore = KM.anomalyScore(Sample); + if (AnomalyScore == std::numeric_limits::quiet_NaN()) { + AnomalyBit = false; + continue; + } + + if (AnomalyScore < (100 * Cfg.DimensionAnomalyScoreThreshold)) { + AnomalyBit = false; + return false; + } + } + + AnomalyBit = true; + return true; +} + +void Dimension::updateAnomalyBitCounter(RRDSET *RS, unsigned Elapsed, bool IsAnomalous) { + AnomalyBitCounter += IsAnomalous; + + if (Elapsed == Cfg.DBEngineAnomalyRateEvery) { + double AR = static_cast(AnomalyBitCounter) / Cfg.DBEngineAnomalyRateEvery; + rrddim_set_by_pointer(RS, getAnomalyRateRD(), AR * 1000); + AnomalyBitCounter = 0; } +} - AnomalyBit = AnomalyScore >= (100 * Cfg.DimensionAnomalyScoreThreshold); - return { MLResult::Success, AnomalyBit }; +std::array Dimension::getModels() { + std::unique_lock Lock(Mutex); + return Models; } diff --git a/ml/Dimension.h b/ml/Dimension.h index d61d385fa7..eb37f0755a 100644 --- a/ml/Dimension.h +++ b/ml/Dimension.h @@ -3,155 +3,106 @@ #ifndef ML_DIMENSION_H #define ML_DIMENSION_H -#include "BitBufferCounter.h" +#include "Query.h" #include "Config.h" #include "ml-private.h" namespace ml { -class RrdDimension { -public: - RrdDimension(RRDDIM *RD) : RD(RD), Ops(&RD->tiers[0]->query_ops) { } - - RRDDIM *getRD() const { return RD; } - - time_t latestTime() { return Ops->latest_time(RD->tiers[0]->db_metric_handle); } - - time_t oldestTime() { return Ops->oldest_time(RD->tiers[0]->db_metric_handle); } +enum class MLResult { + Success = 0, + MissingData, + NaN, +}; - unsigned updateEvery() const { return RD->update_every; } +static inline std::string getMLDimensionID(RRDDIM *RD) { + RRDSET *RS = RD->rrdset; - const std::string getID() const { - RRDSET *RS = RD->rrdset; + std::stringstream SS; + SS << rrdset_context(RS) << "|" << rrdset_id(RS) << "|" << rrddim_name(RD); + return SS.str(); +} - std::stringstream SS; - SS << rrdset_context(RS) << "|" << rrdset_id(RS) << "|" << rrddim_name(RD); - return SS.str(); +class Dimension { +public: + Dimension(RRDDIM *RD, RRDSET *AnomalyRateRS) : + RD(RD), + AnomalyRateRD(rrddim_add(AnomalyRateRS, ml::getMLDimensionID(RD).c_str(), NULL, 1, 1000, RRD_ALGORITHM_ABSOLUTE)), + LastTrainedAt(Seconds(0)), + Trained(false), + ConstantModel(false), + AnomalyScore(0.0), + AnomalyBit(0), + AnomalyBitCounter(0) + { } + + RRDDIM *getRD() const { + return RD; } - bool isActive() const { - if (rrdset_flag_check(RD->rrdset, RRDSET_FLAG_OBSOLETE)) - return false; - - if (rrddim_flag_check(RD, RRDDIM_FLAG_OBSOLETE)) - return false; + unsigned updateEvery() const { + return RD->update_every; + } - return true; + time_t latestTime() const { + return Query(RD).latestTime(); } - void setAnomalyRateRD(RRDDIM *ARRD) { AnomalyRateRD = ARRD; } - RRDDIM *getAnomalyRateRD() const { return AnomalyRateRD; } + time_t oldestTime() const { + return Query(RD).oldestTime(); + } void setAnomalyRateRDName(const char *Name) const { rrddim_reset_name(AnomalyRateRD->rrdset, AnomalyRateRD, Name); } - virtual ~RrdDimension() {} - -private: - RRDDIM *RD; - RRDDIM *AnomalyRateRD; + RRDDIM *getAnomalyRateRD() const { + return AnomalyRateRD; + } - struct rrddim_query_ops *Ops; + bool isTrained() const { + return Trained; + } - std::string ID; -}; + bool isAnomalous() const { + return AnomalyBit; + } -enum class MLResult { - Success = 0, - MissingData, - NaN, -}; + bool shouldTrain(const TimePoint &TP) const; -class TrainableDimension : public RrdDimension { -public: - TrainableDimension(RRDDIM *RD) : - RrdDimension(RD), TrainEvery(Cfg.TrainEvery * updateEvery()) {} + bool isActive() const; MLResult trainModel(); - CalculatedNumber computeAnomalyScore(SamplesBuffer &SB) { - return Trained ? KM.anomalyScore(SB) : 0.0; - } + bool predict(CalculatedNumber Value, bool Exists); - bool shouldTrain(const TimePoint &TP) const { - if (ConstantModel) - return false; + void updateAnomalyBitCounter(RRDSET *RS, unsigned Elapsed, bool IsAnomalous); - return (LastTrainedAt + TrainEvery) < TP; - } + std::pair detect(size_t WindowLength, bool Reset); - bool isTrained() const { return Trained; } + std::array getModels(); private: std::pair getCalculatedNumbers(); public: - TimePoint LastTrainedAt{Seconds{0}}; - -protected: - std::atomic ConstantModel{false}; - -private: - Seconds TrainEvery; - KMeans KM; - - std::atomic Trained{false}; -}; - -class PredictableDimension : public TrainableDimension { -public: - PredictableDimension(RRDDIM *RD) : TrainableDimension(RD) {} - - std::pair predict(); - - void addValue(CalculatedNumber Value, bool Exists); - - bool isAnomalous() { return AnomalyBit; } + RRDDIM *RD; + RRDDIM *AnomalyRateRD; - void updateAnomalyBitCounter(RRDSET *RS, unsigned Elapsed, bool IsAnomalous) { - AnomalyBitCounter += IsAnomalous; + TimePoint LastTrainedAt; + std::atomic Trained; + std::atomic ConstantModel; - if (Elapsed == Cfg.DBEngineAnomalyRateEvery) { - double AR = static_cast(AnomalyBitCounter) / Cfg.DBEngineAnomalyRateEvery; - rrddim_set_by_pointer(RS, getAnomalyRateRD(), AR * 1000); - AnomalyBitCounter = 0; - } - } - -private: - CalculatedNumber AnomalyScore{0.0}; - std::atomic AnomalyBit{false}; - unsigned AnomalyBitCounter{0}; + CalculatedNumber AnomalyScore; + std::atomic AnomalyBit; + unsigned AnomalyBitCounter; std::vector CNs; + std::array Models; + std::mutex Mutex; }; -class DetectableDimension : public PredictableDimension { -public: - DetectableDimension(RRDDIM *RD) : PredictableDimension(RD) {} - - std::pair detect(size_t WindowLength, bool Reset) { - bool AnomalyBit = isAnomalous(); - - if (Reset) - NumSetBits = BBC.numSetBits(); - - NumSetBits += AnomalyBit; - BBC.insert(AnomalyBit); - - double AnomalyRate = static_cast(NumSetBits) / WindowLength; - return { AnomalyBit, AnomalyRate }; - } - -private: - BitBufferCounter BBC{static_cast(Cfg.ADMinWindowSize)}; - size_t NumSetBits{0}; -}; - -using Dimension = DetectableDimension; - } // namespace ml #endif /* ML_DIMENSION_H */ diff --git a/ml/Host.cc b/ml/Host.cc index a6f9b330a7..0eb35fff0b 100644 --- a/ml/Host.cc +++ b/ml/Host.cc @@ -1,278 +1,20 @@ // SPDX-License-Identifier: GPL-3.0-or-later -#include - #include "Config.h" #include "Host.h" +#include "ADCharts.h" #include "json/single_include/nlohmann/json.hpp" using namespace ml; -static void updateDimensionsChart(RRDHOST *RH, - collected_number NumTrainedDimensions, - collected_number NumNormalDimensions, - collected_number NumAnomalousDimensions) { - static thread_local RRDSET *RS = nullptr; - static thread_local RRDDIM *NumTotalDimensionsRD = nullptr; - static thread_local RRDDIM *NumTrainedDimensionsRD = nullptr; - static thread_local RRDDIM *NumNormalDimensionsRD = nullptr; - static thread_local RRDDIM *NumAnomalousDimensionsRD = nullptr; - - if (!RS) { - std::stringstream IdSS, NameSS; - - IdSS << "dimensions_on_" << localhost->machine_guid; - NameSS << "dimensions_on_" << rrdhost_hostname(localhost); - - RS = rrdset_create( - RH, - "anomaly_detection", // type - IdSS.str().c_str(), // id - NameSS.str().c_str(), // name - "dimensions", // family - "anomaly_detection.dimensions", // ctx - "Anomaly detection dimensions", // title - "dimensions", // units - "netdata", // plugin - "ml", // module - 39183, // priority - RH->rrd_update_every, // update_every - RRDSET_TYPE_LINE // chart_type - ); - rrdset_flag_set(RS, RRDSET_FLAG_ANOMALY_DETECTION); - - NumTotalDimensionsRD = rrddim_add(RS, "total", NULL, - 1, 1, RRD_ALGORITHM_ABSOLUTE); - NumTrainedDimensionsRD = rrddim_add(RS, "trained", NULL, - 1, 1, RRD_ALGORITHM_ABSOLUTE); - NumNormalDimensionsRD = rrddim_add(RS, "normal", NULL, - 1, 1, RRD_ALGORITHM_ABSOLUTE); - NumAnomalousDimensionsRD = rrddim_add(RS, "anomalous", NULL, - 1, 1, RRD_ALGORITHM_ABSOLUTE); - } else - rrdset_next(RS); - - rrddim_set_by_pointer(RS, NumTotalDimensionsRD, NumNormalDimensions + NumAnomalousDimensions); - rrddim_set_by_pointer(RS, NumTrainedDimensionsRD, NumTrainedDimensions); - rrddim_set_by_pointer(RS, NumNormalDimensionsRD, NumNormalDimensions); - rrddim_set_by_pointer(RS, NumAnomalousDimensionsRD, NumAnomalousDimensions); - - rrdset_done(RS); -} - -static void updateRateChart(RRDHOST *RH, collected_number AnomalyRate) { - static thread_local RRDSET *RS = nullptr; - static thread_local RRDDIM *AnomalyRateRD = nullptr; - - if (!RS) { - std::stringstream IdSS, NameSS; - - IdSS << "anomaly_rate_on_" << localhost->machine_guid; - NameSS << "anomaly_rate_on_" << rrdhost_hostname(localhost); - - RS = rrdset_create( - RH, - "anomaly_detection", // type - IdSS.str().c_str(), // id - NameSS.str().c_str(), // name - "anomaly_rate", // family - "anomaly_detection.anomaly_rate", // ctx - "Percentage of anomalous dimensions", // title - "percentage", // units - "netdata", // plugin - "ml", // module - 39184, // priority - RH->rrd_update_every, // update_every - RRDSET_TYPE_LINE // chart_type - ); - rrdset_flag_set(RS, RRDSET_FLAG_ANOMALY_DETECTION); - - AnomalyRateRD = rrddim_add(RS, "anomaly_rate", NULL, - 1, 100, RRD_ALGORITHM_ABSOLUTE); - } else - rrdset_next(RS); - - rrddim_set_by_pointer(RS, AnomalyRateRD, AnomalyRate); - - rrdset_done(RS); -} - -static void updateWindowLengthChart(RRDHOST *RH, collected_number WindowLength) { - static thread_local RRDSET *RS = nullptr; - static thread_local RRDDIM *WindowLengthRD = nullptr; - - if (!RS) { - std::stringstream IdSS, NameSS; - - IdSS << "detector_window_on_" << localhost->machine_guid; - NameSS << "detector_window_on_" << rrdhost_hostname(localhost); - - RS = rrdset_create( - RH, - "anomaly_detection", // type - IdSS.str().c_str(), // id - NameSS.str().c_str(), // name - "detector_window", // family - "anomaly_detection.detector_window", // ctx - "Anomaly detector window length", // title - "seconds", // units - "netdata", // plugin - "ml", // module - 39185, // priority - RH->rrd_update_every, // update_every - RRDSET_TYPE_LINE // chart_type - ); - rrdset_flag_set(RS, RRDSET_FLAG_ANOMALY_DETECTION); - - WindowLengthRD = rrddim_add(RS, "duration", NULL, - 1, 1, RRD_ALGORITHM_ABSOLUTE); - } else - rrdset_next(RS); - - rrddim_set_by_pointer(RS, WindowLengthRD, WindowLength * RH->rrd_update_every); - rrdset_done(RS); -} - -static void updateEventsChart(RRDHOST *RH, - std::pair P, - bool ResetBitCounter, - bool NewAnomalyEvent) { - static thread_local RRDSET *RS = nullptr; - static thread_local RRDDIM *AboveThresholdRD = nullptr; - static thread_local RRDDIM *ResetBitCounterRD = nullptr; - static thread_local RRDDIM *NewAnomalyEventRD = nullptr; - - if (!RS) { - std::stringstream IdSS, NameSS; - - IdSS << "detector_events_on_" << localhost->machine_guid; - NameSS << "detector_events_on_" << rrdhost_hostname(localhost); - - RS = rrdset_create( - RH, - "anomaly_detection", // type - IdSS.str().c_str(), // id - NameSS.str().c_str(), // name - "detector_events", // family - "anomaly_detection.detector_events", // ctx - "Anomaly events triggered", // title - "boolean", // units - "netdata", // plugin - "ml", // module - 39186, // priority - RH->rrd_update_every, // update_every - RRDSET_TYPE_LINE // chart_type - ); - rrdset_flag_set(RS, RRDSET_FLAG_ANOMALY_DETECTION); - - AboveThresholdRD = rrddim_add(RS, "above_threshold", NULL, - 1, 1, RRD_ALGORITHM_ABSOLUTE); - ResetBitCounterRD = rrddim_add(RS, "reset_bit_counter", NULL, - 1, 1, RRD_ALGORITHM_ABSOLUTE); - NewAnomalyEventRD = rrddim_add(RS, "new_anomaly_event", NULL, - 1, 1, RRD_ALGORITHM_ABSOLUTE); - } else - rrdset_next(RS); - - BitRateWindow::Edge E = P.first; - bool AboveThreshold = E.second == BitRateWindow::State::AboveThreshold; - - rrddim_set_by_pointer(RS, AboveThresholdRD, AboveThreshold); - rrddim_set_by_pointer(RS, ResetBitCounterRD, ResetBitCounter); - rrddim_set_by_pointer(RS, NewAnomalyEventRD, NewAnomalyEvent); - - rrdset_done(RS); -} - -static void updateDetectionChart(RRDHOST *RH) { - static thread_local RRDSET *RS = nullptr; - static thread_local RRDDIM *UserRD, *SystemRD = nullptr; - - if (!RS) { - std::stringstream IdSS, NameSS; - - IdSS << "prediction_stats_" << RH->machine_guid; - NameSS << "prediction_stats_for_" << rrdhost_hostname(RH); - - RS = rrdset_create_localhost( - "netdata", // type - IdSS.str().c_str(), // id - NameSS.str().c_str(), // name - "ml", // family - "netdata.prediction_stats", // ctx - "Prediction thread CPU usage", // title - "milliseconds/s", // units - "netdata", // plugin - "ml", // module - 136000, // priority - RH->rrd_update_every, // update_every - RRDSET_TYPE_STACKED // chart_type - ); - - UserRD = rrddim_add(RS, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); - SystemRD = rrddim_add(RS, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); - } else - rrdset_next(RS); - - struct rusage TRU; - getrusage(RUSAGE_THREAD, &TRU); - - rrddim_set_by_pointer(RS, UserRD, TRU.ru_utime.tv_sec * 1000000ULL + TRU.ru_utime.tv_usec); - rrddim_set_by_pointer(RS, SystemRD, TRU.ru_stime.tv_sec * 1000000ULL + TRU.ru_stime.tv_usec); - rrdset_done(RS); -} - -static void updateTrainingChart(RRDHOST *RH, struct rusage *TRU) -{ - static thread_local RRDSET *RS = nullptr; - static thread_local RRDDIM *UserRD = nullptr; - static thread_local RRDDIM *SystemRD = nullptr; - - if (!RS) { - std::stringstream IdSS, NameSS; - - IdSS << "training_stats_" << RH->machine_guid; - NameSS << "training_stats_for_" << rrdhost_hostname(RH); - - RS = rrdset_create_localhost( - "netdata", // type - IdSS.str().c_str(), // id - NameSS.str().c_str(), // name - "ml", // family - "netdata.training_stats", // ctx - "Training thread CPU usage", // title - "milliseconds/s", // units - "netdata", // plugin - "ml", // module - 136001, // priority - RH->rrd_update_every, // update_every - RRDSET_TYPE_STACKED // chart_type - ); - - UserRD = rrddim_add(RS, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); - SystemRD = rrddim_add(RS, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL); - } else - rrdset_next(RS); - - rrddim_set_by_pointer(RS, UserRD, TRU->ru_utime.tv_sec * 1000000ULL + TRU->ru_utime.tv_usec); - rrddim_set_by_pointer(RS, SystemRD, TRU->ru_stime.tv_sec * 1000000ULL + TRU->ru_stime.tv_usec); - rrdset_done(RS); -} - void RrdHost::addDimension(Dimension *D) { - RRDDIM *AnomalyRateRD = rrddim_add(AnomalyRateRS, D->getID().c_str(), NULL, - 1, 1000, RRD_ALGORITHM_ABSOLUTE); - D->setAnomalyRateRD(AnomalyRateRD); - - { - std::lock_guard Lock(Mutex); + std::lock_guard Lock(Mutex); - DimensionsMap[D->getRD()] = D; + DimensionsMap[D->getRD()] = D; - // Default construct mutex for dimension - LocksMap[D]; - } + // Default construct mutex for dimension + LocksMap[D]; } void RrdHost::removeDimension(Dimension *D) { @@ -312,18 +54,33 @@ void RrdHost::getConfigAsJson(nlohmann::json &Json) const { Json["max-kmeans-iters"] = Cfg.MaxKMeansIters; Json["dimension-anomaly-score-threshold"] = Cfg.DimensionAnomalyScoreThreshold; - Json["host-anomaly-rate-threshold"] = Cfg.HostAnomalyRateThreshold; - Json["min-window-size"] = Cfg.ADMinWindowSize; - Json["max-window-size"] = Cfg.ADMaxWindowSize; - Json["idle-window-size"] = Cfg.ADIdleWindowSize; - Json["window-rate-threshold"] = Cfg.ADWindowRateThreshold; - Json["dimension-rate-threshold"] = Cfg.ADDimensionRateThreshold; + Json["host-anomaly-rate-threshold"] = Cfg.HostAnomalyRateThreshold; + Json["anomaly-detection-grouping-method"] = group_method2string(Cfg.AnomalyDetectionGroupingMethod); + Json["anomaly-detection-query-duration"] = Cfg.AnomalyDetectionQueryDuration; Json["hosts-to-skip"] = Cfg.HostsToSkip; Json["charts-to-skip"] = Cfg.ChartsToSkip; } +void TrainableHost::getModelsAsJson(nlohmann::json &Json) { + std::lock_guard Lock(Mutex); + + for (auto &DP : DimensionsMap) { + Dimension *D = DP.second; + + nlohmann::json JsonArray = nlohmann::json::array(); + for (const KMeans &KM : D->getModels()) { + nlohmann::json J; + KM.toJson(J); + JsonArray.push_back(J); + } + Json[getMLDimensionID(D->getRD())] = JsonArray; + } + + return; +} + std::pair> TrainableHost::findDimensionToTrain(const TimePoint &NowTP) { std::lock_guard Lock(Mutex); @@ -393,23 +150,12 @@ void TrainableHost::train() { #define WORKER_JOB_UPDATE_DETECTION_CHART 1 #define WORKER_JOB_UPDATE_ANOMALY_RATES 2 #define WORKER_JOB_UPDATE_CHARTS 3 -#define WORKER_JOB_SAVE_ANOMALY_EVENT 4 #if WORKER_UTILIZATION_MAX_JOB_TYPES < 5 #error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 5 #endif void DetectableHost::detectOnce() { - auto P = BRW.insert(WindowAnomalyRate >= Cfg.HostAnomalyRateThreshold); - BitRateWindow::Edge Edge = P.first; - size_t WindowLength = P.second; - - bool ResetBitCounter = (Edge.first != BitRateWindow::State::AboveThreshold); - bool NewAnomalyEvent = (Edge.first == BitRateWindow::State::AboveThreshold) && -