summaryrefslogtreecommitdiffstats
path: root/ml/Host.cc
diff options
context:
space:
mode:
authorvkalintiris <vasilis@netdata.cloud>2022-12-21 15:03:05 +0200
committerGitHub <noreply@github.com>2022-12-21 15:03:05 +0200
commit689dc6b7fbbf495ce3e020dcff0d014a8d338c52 (patch)
tree1f458a3218798b53809d0868a82fcad1a20b1e32 /ml/Host.cc
parentfe386aad57f24574783f4c68bab433a5cdfe6f64 (diff)
Refactor ML code and add support for multiple KMeans models. (#14065)
* Add profile.plugin Creates the specified number of charts/dimensions, and supports backfilling with pseudo-historical data. * Bump * Remove wrongly merged line. * Use the number of models specified from the config section. * Add option to consult all ML models. * Remove profiling option consuming all models. * Add underscore after chart name prefix. * prediction -> dimensions chart * reorder funcs * Split charts across types with correct priority * Ignore training request when chart is under replication. * Track global number of models consulted. * Cleanup config. * initial readme updates * fix readme * readme * Fix function definition when ML is disabled. * Add dummy ml_chart_update_{begin,end} * Remove profile_plugin * Define chart priorities under collectors/all.h * s/curr_t/current_time/ Co-authored-by: Andrew Maguire <andrewm4894@gmail.com>
Diffstat (limited to 'ml/Host.cc')
-rw-r--r--ml/Host.cc352
1 files changed, 194 insertions, 158 deletions
diff --git a/ml/Host.cc b/ml/Host.cc
index 4a57178c76..d68c35233a 100644
--- a/ml/Host.cc
+++ b/ml/Host.cc
@@ -2,42 +2,24 @@
#include "Config.h"
#include "Host.h"
+#include "Queue.h"
#include "ADCharts.h"
#include "json/single_include/nlohmann/json.hpp"
using namespace ml;
-void RrdHost::addDimension(Dimension *D) {
+void Host::addChart(Chart *C) {
std::lock_guard<std::mutex> Lock(Mutex);
-
- DimensionsMap[D->getRD()] = D;
-
- // Default construct mutex for dimension
- LocksMap[D];
+ Charts[C->getRS()] = C;
}
-void RrdHost::removeDimension(Dimension *D) {
- // Remove the dimension from the hosts map.
- {
- std::lock_guard<std::mutex> Lock(Mutex);
- DimensionsMap.erase(D->getRD());
- }
-
- // Delete the dimension by locking the mutex that protects it.
- {
- std::lock_guard<std::mutex> Lock(LocksMap[D]);
- delete D;
- }
-
- // Remove the lock entry for the deleted dimension.
- {
- std::lock_guard<std::mutex> Lock(Mutex);
- LocksMap.erase(D);
- }
+void Host::removeChart(Chart *C) {
+ std::lock_guard<std::mutex> Lock(Mutex);
+ Charts.erase(C->getRS());
}
-void RrdHost::getConfigAsJson(nlohmann::json &Json) const {
+void Host::getConfigAsJson(nlohmann::json &Json) const {
Json["version"] = 1;
Json["enabled"] = Cfg.EnableAnomalyDetection;
@@ -63,193 +45,247 @@ void RrdHost::getConfigAsJson(nlohmann::json &Json) const {
Json["charts-to-skip"] = Cfg.ChartsToSkip;
}
-void TrainableHost::getModelsAsJson(nlohmann::json &Json) {
- std::lock_guard<std::mutex> Lock(Mutex);
-
- for (auto &DP : DimensionsMap) {
- Dimension *D = DP.second;
-
- nlohmann::json JsonArray = nlohmann::json::array();
- for (const KMeans &KM : D->getModels()) {
- nlohmann::json J;
- KM.toJson(J);
- JsonArray.push_back(J);
- }
- Json[getMLDimensionID(D->getRD())] = JsonArray;
- }
-
- return;
-}
-
-std::pair<Dimension *, Duration<double>>
-TrainableHost::findDimensionToTrain(const TimePoint &NowTP) {
+void Host::getModelsAsJson(nlohmann::json &Json) {
std::lock_guard<std::mutex> Lock(Mutex);
- Duration<double> AllottedDuration = Duration<double>{Cfg.TrainEvery * updateEvery()} / (DimensionsMap.size() + 1);
-
- for (auto &DP : DimensionsMap) {
- Dimension *D = DP.second;
-
- if (D->shouldTrain(NowTP)) {
- LocksMap[D].lock();
- return { D, AllottedDuration };
- }
+ for (auto &CP : Charts) {
+ Chart *C = CP.second;
+ C->getModelsAsJson(Json);
}
-
- return { nullptr, AllottedDuration };
}
-void TrainableHost::trainDimension(Dimension *D, const TimePoint &NowTP) {
- if (D == nullptr)
- return;
-
- D->LastTrainedAt = NowTP + Seconds{D->updateEvery()};
- D->trainModel();
+void Host::detectOnce() {
+ MLS = {};
+ MachineLearningStats MLSCopy = {};
+ TrainingStats TSCopy = {};
{
std::lock_guard<std::mutex> Lock(Mutex);
- LocksMap[D].unlock();
- }
-}
-
-void TrainableHost::train() {
- Duration<double> MaxSleepFor = Seconds{10 * updateEvery()};
- worker_register("MLTRAIN");
- worker_register_job_name(0, "dimensions");
+ /*
+ * prediction/detection stats
+ */
+ for (auto &CP : Charts) {
+ Chart *C = CP.second;
- worker_is_busy(0);
- while (!netdata_exit) {
- netdata_thread_testcancel();
- netdata_thread_disable_cancelability();
+ if (!C->isAvailableForML())
+ continue;
- updateResourceUsage();
+ MachineLearningStats ChartMLS = C->getMLS();
- TimePoint NowTP = SteadyClock::now();
+ MLS.NumMachineLearningStatusEnabled += ChartMLS.NumMachineLearningStatusEnabled;
+ MLS.NumMachineLearningStatusDisabledUE += ChartMLS.NumMachineLearningStatusDisabledUE;
+ MLS.NumMachineLearningStatusDisabledSP += ChartMLS.NumMachineLearningStatusDisabledSP;
- auto P = findDimensionToTrain(NowTP);
- trainDimension(P.first, NowTP);
+ MLS.NumMetricTypeConstant += ChartMLS.NumMetricTypeConstant;
+ MLS.NumMetricTypeVariable += ChartMLS.NumMetricTypeVariable;
- netdata_thread_enable_cancelability();
+ MLS.NumTrainingStatusUntrained += ChartMLS.NumTrainingStatusUntrained;
+ MLS.NumTrainingStatusPendingWithoutModel += ChartMLS.NumTrainingStatusPendingWithoutModel;
+ MLS.NumTrainingStatusTrained += ChartMLS.NumTrainingStatusTrained;
+ MLS.NumTrainingStatusPendingWithModel += ChartMLS.NumTrainingStatusPendingWithModel;
- Duration<double> AllottedDuration = P.second;
- Duration<double> RealDuration = SteadyClock::now() - NowTP;
+ MLS.NumAnomalousDimensions += ChartMLS.NumAnomalousDimensions;
+ MLS.NumNormalDimensions += ChartMLS.NumNormalDimensions;
+ }
- Duration<double> SleepFor;
- if (RealDuration >= AllottedDuration)
- continue;
+ HostAnomalyRate = 0.0;
+ size_t NumActiveDimensions = MLS.NumAnomalousDimensions + MLS.NumNormalDimensions;
+ if (NumActiveDimensions)
+ HostAnomalyRate = static_cast<double>(MLS.NumAnomalousDimensions) / NumActiveDimensions;
- worker_is_idle();
- SleepFor = std::min(AllottedDuration - RealDuration, MaxSleepFor);
- TimePoint Now = SteadyClock::now();
- auto Until = Now + SleepFor;
- while (Now < Until && !netdata_exit) {
- std::this_thread::sleep_for(std::chrono::milliseconds(1000));
- Now = SteadyClock::now();
- }
- worker_is_busy(0);
- }
-}
+ MLSCopy = MLS;
-#define WORKER_JOB_DETECT_DIMENSION 0
-#define WORKER_JOB_UPDATE_DETECTION_CHART 1
-#define WORKER_JOB_UPDATE_ANOMALY_RATES 2
-#define WORKER_JOB_UPDATE_CHARTS 3
+ /*
+ * training stats
+ */
+ TSCopy = TS;
-#if WORKER_UTILIZATION_MAX_JOB_TYPES < 5
-#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 5
-#endif
+ TS.QueueSize = 0;
+ TS.NumPoppedItems = 0;
-void DetectableHost::detectOnce() {
- size_t NumAnomalousDimensions = 0;
- size_t NumNormalDimensions = 0;
- size_t NumTrainedDimensions = 0;
- size_t NumActiveDimensions = 0;
+ TS.AllottedUT = 0;
+ TS.ConsumedUT = 0;
+ TS.RemainingUT = 0;
- {
- std::lock_guard<std::mutex> Lock(Mutex);
+ TS.TrainingResultOk = 0;
+ TS.TrainingResultInvalidQueryTimeRange = 0;
+ TS.TrainingResultNotEnoughCollectedValues = 0;
+ TS.TrainingResultNullAcquiredDimension = 0;
+ TS.TrainingResultChartUnderReplication = 0;
+ }
- for (auto &DP : DimensionsMap) {
- worker_is_busy(WORKER_JOB_DETECT_DIMENSION);
+ // Calc the avg values
+ if (TSCopy.NumPoppedItems) {
+ TSCopy.QueueSize /= TSCopy.NumPoppedItems;
+ TSCopy.AllottedUT /= TSCopy.NumPoppedItems;
+ TSCopy.ConsumedUT /= TSCopy.NumPoppedItems;
+ TSCopy.RemainingUT /= TSCopy.NumPoppedItems;
+
+ TSCopy.TrainingResultOk /= TSCopy.NumPoppedItems;
+ TSCopy.TrainingResultInvalidQueryTimeRange /= TSCopy.NumPoppedItems;
+ TSCopy.TrainingResultNotEnoughCollectedValues /= TSCopy.NumPoppedItems;
+ TSCopy.TrainingResultNullAcquiredDimension /= TSCopy.NumPoppedItems;
+ TSCopy.TrainingResultChartUnderReplication /= TSCopy.NumPoppedItems;
+ } else {
+ TSCopy.QueueSize = 0;
+ TSCopy.AllottedUT = 0;
+ TSCopy.ConsumedUT = 0;
+ TSCopy.RemainingUT = 0;
+ }
- Dimension *D = DP.second;
+ updateDimensionsChart(RH, MLSCopy);
+ updateHostAndDetectionRateCharts(RH, HostAnomalyRate * 10000.0);
- if (!D->isActive())
- continue;
+ struct rusage PredictionRU;
+ getrusage(RUSAGE_THREAD, &PredictionRU);
+ updateResourceUsageCharts(RH, PredictionRU, TSCopy.TrainingRU);
- NumActiveDimensions++;
- NumTrainedDimensions += D->isTrained();
+ updateTrainingStatisticsChart(RH, TSCopy);
+}
- bool IsAnomalous = D->isAnomalous();
- if (IsAnomalous)
- NumAnomalousDimensions += 1;
+class AcquiredDimension {
+public:
+ static AcquiredDimension find(RRDHOST *RH, STRING *ChartId, STRING *DimensionId) {
+ RRDDIM_ACQUIRED *AcqRD = nullptr;
+ Dimension *D = nullptr;
+
+ rrdhost_rdlock(RH);
+ RRDSET *RS = rrdset_find(RH, string2str(ChartId));
+ if (RS) {
+ AcqRD = rrddim_find_and_acquire(RS, string2str(DimensionId));
+ if (AcqRD) {
+ RRDDIM *RD = rrddim_acquired_to_rrddim(AcqRD);
+ if (RD)
+ D = reinterpret_cast<Dimension *>(RD->ml_dimension);
+ }
}
+ rrdhost_unlock(RH);
+
+ return AcquiredDimension(AcqRD, D);
+ }
- if (NumAnomalousDimensions)
- HostAnomalyRate = static_cast<double>(NumAnomalousDimensions) / NumActiveDimensions;
- else
- HostAnomalyRate = 0.0;
+private:
+ AcquiredDimension(RRDDIM_ACQUIRED *AcqRD, Dimension *D) : AcqRD(AcqRD), D(D) {}
- NumNormalDimensions = NumActiveDimensions - NumAnomalousDimensions;
+public:
+ TrainingResult train(const TrainingRequest &TR) {
+ if (!D)
+ return TrainingResult::NullAcquiredDimension;
+
+ return D->trainModel(TR);
}
- this->NumAnomalousDimensions = NumAnomalousDimensions;
- this->NumNormalDimensions = NumNormalDimensions;
- this->NumTrainedDimensions = NumTrainedDimensions;
- this->NumActiveDimensions = NumActiveDimensions;
+ ~AcquiredDimension() {
+ if (AcqRD)
+ rrddim_acquired_release(AcqRD);
+ }
- worker_is_busy(WORKER_JOB_UPDATE_CHARTS);
- updateDimensionsChart(getRH(), NumTrainedDimensions, NumNormalDimensions, NumAnomalousDimensions);
- updateHostAndDetectionRateCharts(getRH(), HostAnomalyRate * 10000.0);
+private:
+ RRDDIM_ACQUIRED *AcqRD;
+ Dimension *D;
+};
- struct rusage TRU;
- getResourceUsage(&TRU);
- updateTrainingChart(getRH(), &TRU);
+void Host::scheduleForTraining(TrainingRequest TR) {
+ TrainingQueue.push(TR);
}
-void DetectableHost::detect() {
- worker_register("MLDETECT");
- worker_register_job_name(WORKER_JOB_DETECT_DIMENSION, "dimensions");
- worker_register_job_name(WORKER_JOB_UPDATE_DETECTION_CHART, "detection chart");
- worker_register_job_name(WORKER_JOB_UPDATE_ANOMALY_RATES, "anomaly rates");
- worker_register_job_name(WORKER_JOB_UPDATE_CHARTS, "charts");
+void Host::train() {
+ while (!netdata_exit) {
+ netdata_thread_disable_cancelability();
- std::this_thread::sleep_for(Seconds{10});
+ auto P = TrainingQueue.pop();
+ TrainingRequest TrainingReq = P.first;
+ size_t Size = P.second;
+
+ usec_t AllottedUT = (Cfg.TrainEvery * RH->rrd_update_every * USEC_PER_SEC) / Size;
+ if (AllottedUT > USEC_PER_SEC)
+ AllottedUT = USEC_PER_SEC;
+
+ usec_t StartUT = now_realtime_usec();
+ TrainingResult TrainingRes;
+ {
+ AcquiredDimension AcqDim = AcquiredDimension::find(RH, TrainingReq.ChartId, TrainingReq.DimensionId);
+ TrainingRes = AcqDim.train(TrainingReq);
+ string_freez(TrainingReq.ChartId);
+ string_freez(TrainingReq.DimensionId);
+ }
+ usec_t ConsumedUT = now_realtime_usec() - StartUT;
+
+ usec_t RemainingUT = 0;
+ if (ConsumedUT < AllottedUT)
+ RemainingUT = AllottedUT - ConsumedUT;
+
+ {
+ std::lock_guard<std::mutex> Lock(Mutex);
+
+ if (TS.AllottedUT == 0) {
+ struct rusage TRU;
+ getrusage(RUSAGE_THREAD, &TRU);
+ TS.TrainingRU = TRU;
+ }
+
+ TS.QueueSize += Size;
+ TS.NumPoppedItems += 1;
+
+ TS.AllottedUT += AllottedUT;
+ TS.ConsumedUT += ConsumedUT;
+ TS.RemainingUT += RemainingUT;
+
+ switch (TrainingRes) {
+ case TrainingResult::Ok:
+ TS.TrainingResultOk += 1;
+ break;
+ case TrainingResult::InvalidQueryTimeRange:
+ TS.TrainingResultInvalidQueryTimeRange += 1;
+ break;
+ case TrainingResult::NotEnoughCollectedValues:
+ TS.TrainingResultNotEnoughCollectedValues += 1;
+ break;
+ case TrainingResult::NullAcquiredDimension:
+ TS.TrainingResultNullAcquiredDimension += 1;
+ break;
+ case TrainingResult::ChartUnderReplication:
+ TS.TrainingResultChartUnderReplication += 1;
+ break;
+ }
+ }
+ netdata_thread_enable_cancelability();
+ std::this_thread::sleep_for(std::chrono::microseconds{RemainingUT});
+ }
+}
+
+void Host::detect() {
heartbeat_t HB;
heartbeat_init(&HB);
while (!netdata_exit) {
- netdata_thread_testcancel();
- worker_is_idle();
- heartbeat_next(&HB, updateEvery() * USEC_PER_SEC);
+ heartbeat_next(&HB, RH->rrd_update_every * USEC_PER_SEC);
netdata_thread_disable_cancelability();
detectOnce();
-
- worker_is_busy(WORKER_JOB_UPDATE_DETECTION_CHART);
- updateDetectionChart(getRH());
netdata_thread_enable_cancelability();
}
}
-void DetectableHost::getDetectionInfoAsJson(nlohmann::json &Json) const {
+void Host::getDetectionInfoAsJson(nlohmann::json &Json) const {
Json["version"] = 1;
- Json["anomalous-dimensions"] = NumAnomalousDimensions;
- Json["normal-dimensions"] = NumNormalDimensions;
- Json["total-dimensions"] = NumAnomalousDimensions + NumNormalDimensions;
- Json["trained-dimensions"] = NumTrainedDimensions;
+ Json["anomalous-dimensions"] = MLS.NumAnomalousDimensions;
+ Json["normal-dimensions"] = MLS.NumNormalDimensions;
+ Json["total-dimensions"] = MLS.NumAnomalousDimensions + MLS.NumNormalDimensions;
+ Json["trained-dimensions"] = MLS.NumTrainingStatusTrained + MLS.NumTrainingStatusPendingWithModel;
}
-void DetectableHost::startAnomalyDetectionThreads() {
- TrainingThread = std::thread(&TrainableHost::train, this);
- DetectionThread = std::thread(&DetectableHost::detect, this);
+void Host::startAnomalyDetectionThreads() {
+ TrainingThread = std::thread(&Host::train, this);
+ DetectionThread = std::thread(&Host::detect, this);
}
-void DetectableHost::stopAnomalyDetectionThreads() {
+void Host::stopAnomalyDetectionThreads() {
netdata_thread_cancel(TrainingThread.native_handle());
- netdata_thread_cancel(DetectionThread.native_handle());
-
TrainingThread.join();
+
+ netdata_thread_cancel(DetectionThread.native_handle());
DetectionThread.join();
}