summaryrefslogtreecommitdiffstats
path: root/ml/Host.cc
diff options
context:
space:
mode:
authorvkalintiris <vasilis@netdata.cloud>2023-01-04 14:51:25 +0200
committerGitHub <noreply@github.com>2023-01-04 14:51:25 +0200
commit78359cd375d0b2c285741e6f934a681d0a0c3c15 (patch)
tree2d5264325510b663d9e87ca62a38fad187e3a713 /ml/Host.cc
parentdf379e45fbaddf825f1f7972a75ae3f3daf80097 (diff)
Refactor ML code and add support for multiple KMeans models (#14198)
* Add profile.plugin Creates the specified number of charts/dimensions, and supports backfilling with pseudo-historical data. * Bump * Remove wrongly merged line. * Use the number of models specified from the config section. * Add option to consult all ML models. * Remove profiling option consuming all models. * Add underscore after chart name prefix. * prediction -> dimensions chart * reorder funcs * Split charts across types with correct priority * Ignore training request when chart is under replication. * Track global number of models consulted. * Cleanup config. * initial readme updates * fix readme * readme * Fix function definition when ML is disabled. * Add dummy ml_chart_update_{begin,end} * Remove profile_plugin * Define chart priorities under collectors/all.h * s/curr_t/current_time/ * Use libnetdata's lock/thread wrappers. * Fix autotools & cmake builds. * Delete ML dimensions & charts. * Let users of buffer preprocessing to handle memory. * Add separate API calls to start/stop ML threads. Co-authored-by: Andrew Maguire <andrewm4894@gmail.com>
Diffstat (limited to 'ml/Host.cc')
-rw-r--r--ml/Host.cc393
1 files changed, 228 insertions, 165 deletions
diff --git a/ml/Host.cc b/ml/Host.cc
index 4a57178c76..fcf97b2b8d 100644
--- a/ml/Host.cc
+++ b/ml/Host.cc
@@ -2,42 +2,24 @@
#include "Config.h"
#include "Host.h"
+#include "Queue.h"
#include "ADCharts.h"
#include "json/single_include/nlohmann/json.hpp"
using namespace ml;
-void RrdHost::addDimension(Dimension *D) {
- std::lock_guard<std::mutex> Lock(Mutex);
-
- DimensionsMap[D->getRD()] = D;
-
- // Default construct mutex for dimension
- LocksMap[D];
+void Host::addChart(Chart *C) {
+ std::lock_guard<Mutex> L(M);
+ Charts[C->getRS()] = C;
}
-void RrdHost::removeDimension(Dimension *D) {
- // Remove the dimension from the hosts map.
- {
- std::lock_guard<std::mutex> Lock(Mutex);
- DimensionsMap.erase(D->getRD());
- }
-
- // Delete the dimension by locking the mutex that protects it.
- {
- std::lock_guard<std::mutex> Lock(LocksMap[D]);
- delete D;
- }
-
- // Remove the lock entry for the deleted dimension.
- {
- std::lock_guard<std::mutex> Lock(Mutex);
- LocksMap.erase(D);
- }
+void Host::removeChart(Chart *C) {
+ std::lock_guard<Mutex> L(M);
+ Charts.erase(C->getRS());
}
-void RrdHost::getConfigAsJson(nlohmann::json &Json) const {
+void Host::getConfigAsJson(nlohmann::json &Json) const {
Json["version"] = 1;
Json["enabled"] = Cfg.EnableAnomalyDetection;
@@ -63,193 +45,274 @@ void RrdHost::getConfigAsJson(nlohmann::json &Json) const {
Json["charts-to-skip"] = Cfg.ChartsToSkip;
}
-void TrainableHost::getModelsAsJson(nlohmann::json &Json) {
- std::lock_guard<std::mutex> Lock(Mutex);
+void Host::getModelsAsJson(nlohmann::json &Json) {
+ std::lock_guard<Mutex> L(M);
- for (auto &DP : DimensionsMap) {
- Dimension *D = DP.second;
-
- nlohmann::json JsonArray = nlohmann::json::array();
- for (const KMeans &KM : D->getModels()) {
- nlohmann::json J;
- KM.toJson(J);
- JsonArray.push_back(J);
- }
- Json[getMLDimensionID(D->getRD())] = JsonArray;
+ for (auto &CP : Charts) {
+ Chart *C = CP.second;
+ C->getModelsAsJson(Json);
}
-
- return;
}
-std::pair<Dimension *, Duration<double>>
-TrainableHost::findDimensionToTrain(const TimePoint &NowTP) {
- std::lock_guard<std::mutex> Lock(Mutex);
-
- Duration<double> AllottedDuration = Duration<double>{Cfg.TrainEvery * updateEvery()} / (DimensionsMap.size() + 1);
-
- for (auto &DP : DimensionsMap) {
- Dimension *D = DP.second;
+void Host::detectOnce() {
+ MLS = {};
+ MachineLearningStats MLSCopy = {};
+ TrainingStats TSCopy = {};
- if (D->shouldTrain(NowTP)) {
- LocksMap[D].lock();
- return { D, AllottedDuration };
- }
- }
+ {
+ std::lock_guard<Mutex> L(M);
- return { nullptr, AllottedDuration };
-}
+ /*
+ * prediction/detection stats
+ */
+ for (auto &CP : Charts) {
+ Chart *C = CP.second;
-void TrainableHost::trainDimension(Dimension *D, const TimePoint &NowTP) {
- if (D == nullptr)
- return;
+ if (!C->isAvailableForML())
+ continue;
- D->LastTrainedAt = NowTP + Seconds{D->updateEvery()};
- D->trainModel();
+ MachineLearningStats ChartMLS = C->getMLS();
- {
- std::lock_guard<std::mutex> Lock(Mutex);
- LocksMap[D].unlock();
- }
-}
-
-void TrainableHost::train() {
- Duration<double> MaxSleepFor = Seconds{10 * updateEvery()};
+ MLS.NumMachineLearningStatusEnabled += ChartMLS.NumMachineLearningStatusEnabled;
+ MLS.NumMachineLearningStatusDisabledUE += ChartMLS.NumMachineLearningStatusDisabledUE;
+ MLS.NumMachineLearningStatusDisabledSP += ChartMLS.NumMachineLearningStatusDisabledSP;
- worker_register("MLTRAIN");
- worker_register_job_name(0, "dimensions");
+ MLS.NumMetricTypeConstant += ChartMLS.NumMetricTypeConstant;
+ MLS.NumMetricTypeVariable += ChartMLS.NumMetricTypeVariable;
- worker_is_busy(0);
- while (!netdata_exit) {
- netdata_thread_testcancel();
- netdata_thread_disable_cancelability();
+ MLS.NumTrainingStatusUntrained += ChartMLS.NumTrainingStatusUntrained;
+ MLS.NumTrainingStatusPendingWithoutModel += ChartMLS.NumTrainingStatusPendingWithoutModel;
+ MLS.NumTrainingStatusTrained += ChartMLS.NumTrainingStatusTrained;
+ MLS.NumTrainingStatusPendingWithModel += ChartMLS.NumTrainingStatusPendingWithModel;
- updateResourceUsage();
+ MLS.NumAnomalousDimensions += ChartMLS.NumAnomalousDimensions;
+ MLS.NumNormalDimensions += ChartMLS.NumNormalDimensions;
+ }
- TimePoint NowTP = SteadyClock::now();
+ HostAnomalyRate = 0.0;
+ size_t NumActiveDimensions = MLS.NumAnomalousDimensions + MLS.NumNormalDimensions;
+ if (NumActiveDimensions)
+ HostAnomalyRate = static_cast<double>(MLS.NumAnomalousDimensions) / NumActiveDimensions;
- auto P = findDimensionToTrain(NowTP);
- trainDimension(P.first, NowTP);
+ MLSCopy = MLS;
- netdata_thread_enable_cancelability();
+ /*
+ * training stats
+ */
+ TSCopy = TS;
- Duration<double> AllottedDuration = P.second;
- Duration<double> RealDuration = SteadyClock::now() - NowTP;
+ TS.QueueSize = 0;
+ TS.NumPoppedItems = 0;
- Duration<double> SleepFor;
- if (RealDuration >= AllottedDuration)
- continue;
+ TS.AllottedUT = 0;
+ TS.ConsumedUT = 0;
+ TS.RemainingUT = 0;
- worker_is_idle();
- SleepFor = std::min(AllottedDuration - RealDuration, MaxSleepFor);
- TimePoint Now = SteadyClock::now();
- auto Until = Now + SleepFor;
- while (Now < Until && !netdata_exit) {
- std::this_thread::sleep_for(std::chrono::milliseconds(1000));
- Now = SteadyClock::now();
- }
- worker_is_busy(0);
+ TS.TrainingResultOk = 0;
+ TS.TrainingResultInvalidQueryTimeRange = 0;
+ TS.TrainingResultNotEnoughCollectedValues = 0;
+ TS.TrainingResultNullAcquiredDimension = 0;
+ TS.TrainingResultChartUnderReplication = 0;
}
-}
-
-#define WORKER_JOB_DETECT_DIMENSION 0
-#define WORKER_JOB_UPDATE_DETECTION_CHART 1
-#define WORKER_JOB_UPDATE_ANOMALY_RATES 2
-#define WORKER_JOB_UPDATE_CHARTS 3
-#if WORKER_UTILIZATION_MAX_JOB_TYPES < 5
-#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 5
-#endif
-
-void DetectableHost::detectOnce() {
- size_t NumAnomalousDimensions = 0;
- size_t NumNormalDimensions = 0;
- size_t NumTrainedDimensions = 0;
- size_t NumActiveDimensions = 0;
+ // Calc the avg values
+ if (TSCopy.NumPoppedItems) {
+ TSCopy.QueueSize /= TSCopy.NumPoppedItems;
+ TSCopy.AllottedUT /= TSCopy.NumPoppedItems;
+ TSCopy.ConsumedUT /= TSCopy.NumPoppedItems;
+ TSCopy.RemainingUT /= TSCopy.NumPoppedItems;
+
+ TSCopy.TrainingResultOk /= TSCopy.NumPoppedItems;
+ TSCopy.TrainingResultInvalidQueryTimeRange /= TSCopy.NumPoppedItems;
+ TSCopy.TrainingResultNotEnoughCollectedValues /= TSCopy.NumPoppedItems;
+ TSCopy.TrainingResultNullAcquiredDimension /= TSCopy.NumPoppedItems;
+ TSCopy.TrainingResultChartUnderReplication /= TSCopy.NumPoppedItems;
+ } else {
+ TSCopy.QueueSize = 0;
+ TSCopy.AllottedUT = 0;
+ TSCopy.ConsumedUT = 0;
+ TSCopy.RemainingUT = 0;
+ }
- {
- std::lock_guard<std::mutex> Lock(Mutex);
+ updateDimensionsChart(RH, MLSCopy);
+ updateHostAndDetectionRateCharts(RH, HostAnomalyRate * 10000.0);
- for (auto &DP : DimensionsMap) {
- worker_is_busy(WORKER_JOB_DETECT_DIMENSION);
+ struct rusage PredictionRU;
+ getrusage(RUSAGE_THREAD, &PredictionRU);
+ updateResourceUsageCharts(RH, PredictionRU, TSCopy.TrainingRU);
- Dimension *D = DP.second;
+ updateTrainingStatisticsChart(RH, TSCopy);
+}
- if (!D->isActive())
- continue;
+class AcquiredDimension {
+public:
+ static AcquiredDimension find(RRDHOST *RH, STRING *ChartId, STRING *DimensionId) {
+ RRDDIM_ACQUIRED *AcqRD = nullptr;
+ Dimension *D = nullptr;
+
+ rrdhost_rdlock(RH);
+ RRDSET *RS = rrdset_find(RH, string2str(ChartId));
+ if (RS) {
+ AcqRD = rrddim_find_and_acquire(RS, string2str(DimensionId));
+ if (AcqRD) {
+ RRDDIM *RD = rrddim_acquired_to_rrddim(AcqRD);
+ if (RD)
+ D = reinterpret_cast<Dimension *>(RD->ml_dimension);
+ }
+ }
+ rrdhost_unlock(RH);
- NumActiveDimensions++;
- NumTrainedDimensions += D->isTrained();
+ return AcquiredDimension(AcqRD, D);
+ }
- bool IsAnomalous = D->isAnomalous();
- if (IsAnomalous)
- NumAnomalousDimensions += 1;
- }
+private:
+ AcquiredDimension(RRDDIM_ACQUIRED *AcqRD, Dimension *D) : AcqRD(AcqRD), D(D) {}
- if (NumAnomalousDimensions)
- HostAnomalyRate = static_cast<double>(NumAnomalousDimensions) / NumActiveDimensions;
- else
- HostAnomalyRate = 0.0;
+public:
+ TrainingResult train(const TrainingRequest &TR) {
+ if (!D)
+ return TrainingResult::NullAcquiredDimension;
- NumNormalDimensions = NumActiveDimensions - NumAnomalousDimensions;
+ return D->trainModel(TR);
}
- this->NumAnomalousDimensions = NumAnomalousDimensions;
- this->NumNormalDimensions = NumNormalDimensions;
- this->NumTrainedDimensions = NumTrainedDimensions;
- this->NumActiveDimensions = NumActiveDimensions;
+ ~AcquiredDimension() {
+ if (AcqRD)
+ rrddim_acquired_release(AcqRD);
+ }
- worker_is_busy(WORKER_JOB_UPDATE_CHARTS);
- updateDimensionsChart(getRH(), NumTrainedDimensions, NumNormalDimensions, NumAnomalousDimensions);
- updateHostAndDetectionRateCharts(getRH(), HostAnomalyRate * 10000.0);
+private:
+ RRDDIM_ACQUIRED *AcqRD;
+ Dimension *D;
+};
- struct rusage TRU;
- getResourceUsage(&TRU);
- updateTrainingChart(getRH(), &TRU);
+void Host::scheduleForTraining(TrainingRequest TR) {
+ TrainingQueue.push(TR);
}
-void DetectableHost::detect() {
- worker_register("MLDETECT");
- worker_register_job_name(WORKER_JOB_DETECT_DIMENSION, "dimensions");
- worker_register_job_name(WORKER_JOB_UPDATE_DETECTION_CHART, "detection chart");
- worker_register_job_name(WORKER_JOB_UPDATE_ANOMALY_RATES, "anomaly rates");
- worker_register_job_name(WORKER_JOB_UPDATE_CHARTS, "charts");
+void Host::train() {
+ while (!netdata_exit) {
+ auto P = TrainingQueue.pop();
+ TrainingRequest TrainingReq = P.first;
+ size_t Size = P.second;
+
+ usec_t AllottedUT = (Cfg.TrainEvery * RH->rrd_update_every * USEC_PER_SEC) / Size;
+ if (AllottedUT > USEC_PER_SEC)
+ AllottedUT = USEC_PER_SEC;
+
+ usec_t StartUT = now_realtime_usec();
+ TrainingResult TrainingRes;
+ {
+ AcquiredDimension AcqDim = AcquiredDimension::find(RH, TrainingReq.ChartId, TrainingReq.DimensionId);
+ TrainingRes = AcqDim.train(TrainingReq);
+ string_freez(TrainingReq.ChartId);
+ string_freez(TrainingReq.DimensionId);
+ }
+ usec_t ConsumedUT = now_realtime_usec() - StartUT;
+
+ usec_t RemainingUT = 0;
+ if (ConsumedUT < AllottedUT)
+ RemainingUT = AllottedUT - ConsumedUT;
+
+ {
+ std::lock_guard<Mutex> L(M);
+
+ if (TS.AllottedUT == 0) {
+ struct rusage TRU;
+ getrusage(RUSAGE_THREAD, &TRU);
+ TS.TrainingRU = TRU;
+ }
+
+ TS.QueueSize += Size;
+ TS.NumPoppedItems += 1;
+
+ TS.AllottedUT += AllottedUT;
+ TS.ConsumedUT += ConsumedUT;
+ TS.RemainingUT += RemainingUT;
+
+ switch (TrainingRes) {
+ case TrainingResult::Ok:
+ TS.TrainingResultOk += 1;
+ break;
+ case TrainingResult::InvalidQueryTimeRange:
+ TS.TrainingResultInvalidQueryTimeRange += 1;
+ break;
+ case TrainingResult::NotEnoughCollectedValues:
+ TS.TrainingResultNotEnoughCollectedValues += 1;
+ break;
+ case TrainingResult::NullAcquiredDimension:
+ TS.TrainingResultNullAcquiredDimension += 1;
+ break;
+ case TrainingResult::ChartUnderReplication:
+ TS.TrainingResultChartUnderReplication += 1;
+ break;
+ }
+ }
- std::this_thread::sleep_for(Seconds{10});
+ std::this_thread::sleep_for(std::chrono::microseconds{RemainingUT});
+ }
+}
+void Host::detect() {
heartbeat_t HB;
heartbeat_init(&HB);
while (!netdata_exit) {
- netdata_thread_testcancel();
- worker_is_idle();
- heartbeat_next(&HB, updateEvery() * USEC_PER_SEC);
-
- netdata_thread_disable_cancelability();
+ heartbeat_next(&HB, RH->rrd_update_every * USEC_PER_SEC);
detectOnce();
-
- worker_is_busy(WORKER_JOB_UPDATE_DETECTION_CHART);
- updateDetectionChart(getRH());
- netdata_thread_enable_cancelability();
}
}
-void DetectableHost::getDetectionInfoAsJson(nlohmann::json &Json) const {
+void Host::getDetectionInfoAsJson(nlohmann::json &Json) const {
Json["version"] = 1;
- Json["anomalous-dimensions"] = NumAnomalousDimensions;
- Json["normal-dimensions"] = NumNormalDimensions;
- Json["total-dimensions"] = NumAnomalousDimensions + NumNormalDimensions;
- Json["trained-dimensions"] = NumTrainedDimensions;
+ Json["anomalous-dimensions"] = MLS.NumAnomalousDimensions;
+ Json["normal-dimensions"] = MLS.NumNormalDimensions;
+ Json["total-dimensions"] = MLS.NumAnomalousDimensions + MLS.NumNormalDimensions;
+ Json["trained-dimensions"] = MLS.NumTrainingStatusTrained + MLS.NumTrainingStatusPendingWithModel;
+}
+
+void *train_main(void *Arg) {
+ Host *H = reinterpret_cast<Host *>(Arg);
+ H->train();
+ return nullptr;
}
-void DetectableHost::startAnomalyDetectionThreads() {
- TrainingThread = std::thread(&TrainableHost::train, this);
- DetectionThread = std::thread(&DetectableHost::detect, this);
+void *detect_main(void *Arg) {
+ Host *H = reinterpret_cast<Host *>(Arg);
+ H->detect();
+ return nullptr;
}
-void DetectableHost::stopAnomalyDetectionThreads() {
- netdata_thread_cancel(TrainingThread.native_handle());
- netdata_thread_cancel(DetectionThread.native_handle());
+void Host::startAnomalyDetectionThreads() {
+ if (ThreadsRunning) {
+ error("Anomaly detections threads for host %s are already-up and running.", rrdhost_hostname(RH));
+ return;
+ }
+
+ ThreadsRunning = true;
+
+ char Tag[NETDATA_THREAD_TAG_MAX + 1];
+
+ snprintfz(Tag, NETDATA_THREAD_TAG_MAX, "TRAIN[%s]", rrdhost_hostname(RH));
+ netdata_thread_create(&TrainingThread, Tag, NETDATA_THREAD_OPTION_JOINABLE, train_main, static_cast<void *>(this));
+
+ snprintfz(Tag, NETDATA_THREAD_TAG_MAX, "DETECT[%s]", rrdhost_hostname(RH));
+ netdata_thread_create(&DetectionThread, Tag, NETDATA_THREAD_OPTION_JOINABLE, detect_main, static_cast<void *>(this));
+}
+
+void Host::stopAnomalyDetectionThreads() {
+ if (!ThreadsRunning) {
+ error("Anomaly detections threads for host %s have already been stopped.", rrdhost_hostname(RH));
+ return;
+ }
+
+ ThreadsRunning = false;
+
+ // Signal the training queue to stop popping-items
+ TrainingQueue.signal();
+ netdata_thread_cancel(TrainingThread);
+ netdata_thread_join(TrainingThread, nullptr);
- TrainingThread.join();
- DetectionThread.join();
+ netdata_thread_cancel(DetectionThread);
+ netdata_thread_join(DetectionThread, nullptr);
}