summaryrefslogtreecommitdiffstats
path: root/ml/Host.cc
diff options
context:
space:
mode:
authorvkalintiris <vasilis@netdata.cloud>2022-03-30 13:38:18 +0300
committerGitHub <noreply@github.com>2022-03-30 13:38:18 +0300
commit41a40dc3a406c3c8dc70f41038e7d75ef2601f8b (patch)
treed889c164f2be7588a4c5940cf777ccd94cb39e7a /ml/Host.cc
parentffee2317885bf8ceab7224ba23aad08421986cd5 (diff)
ML-related changes to address issue/discussion comments. (#12494)
* Increase training thread's max sleep time. With this change we will only cap the allotted time when it is more than ten seconds. The previous limit was one second, which had the effect of scheduling dimensions near the beggining of each training window. This was not desirable because it would cause high CPU usage on parents with many children. * Only exclude netdata.* charts from training. * Use heartbeat in detection thread. * Track rusage of prediction thread. * Track rusage of training thread. * Add support for random sampling of extracted features. * Rebase * Skip RNG when ML is disabled and fix undef behaviour
Diffstat (limited to 'ml/Host.cc')
-rw-r--r--ml/Host.cc77
1 files changed, 35 insertions, 42 deletions
diff --git a/ml/Host.cc b/ml/Host.cc
index a1a89f3042..59a7c93c98 100644
--- a/ml/Host.cc
+++ b/ml/Host.cc
@@ -184,13 +184,13 @@ static void updateEventsChart(RRDHOST *RH,
rrdset_done(RS);
}
-static void updateDetectionChart(RRDHOST *RH, collected_number PredictionDuration) {
+static void updateDetectionChart(RRDHOST *RH) {
static thread_local RRDSET *RS = nullptr;
- static thread_local RRDDIM *PredictiobDurationRD = nullptr;
+ static thread_local RRDDIM *UserRD, *SystemRD = nullptr;
if (!RS) {
std::string IdPrefix = "prediction_stats";
- std::string TitlePrefix = "Time it took to run prediction for host";
+ std::string TitlePrefix = "Prediction thread CPU usage for host";
auto IdTitlePair = getHostSpecificIdAndTitle(RH, IdPrefix, TitlePrefix);
RS = rrdset_create_localhost(
@@ -200,35 +200,36 @@ static void updateDetectionChart(RRDHOST *RH, collected_number PredictionDuratio
"prediction_stats", // family
"anomaly_detection.prediction_stats", // ctx
IdTitlePair.second.c_str(), // title
- "milliseconds", // units
+ "milliseconds/s", // units
"netdata", // plugin
"ml", // module
39187, // priority
RH->rrd_update_every, // update_every
- RRDSET_TYPE_LINE // chart_type
+ RRDSET_TYPE_STACKED // chart_type
);
- PredictiobDurationRD = rrddim_add(RS, "duration", NULL,
- 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ UserRD = rrddim_add(RS, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
+ SystemRD = rrddim_add(RS, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
} else
rrdset_next(RS);
- rrddim_set_by_pointer(RS, PredictiobDurationRD, PredictionDuration);
+ struct rusage TRU;
+ getrusage(RUSAGE_THREAD, &TRU);
+ rrddim_set_by_pointer(RS, UserRD, TRU.ru_utime.tv_sec * 1000000ULL + TRU.ru_utime.tv_usec);
+ rrddim_set_by_pointer(RS, SystemRD, TRU.ru_stime.tv_sec * 1000000ULL + TRU.ru_stime.tv_usec);
rrdset_done(RS);
}
-static void updateTrainingChart(RRDHOST *RH,
- collected_number TotalTrainingDuration,
- collected_number MaxTrainingDuration)
+static void updateTrainingChart(RRDHOST *RH, struct rusage *TRU)
{
static thread_local RRDSET *RS = nullptr;
- static thread_local RRDDIM *TotalTrainingDurationRD = nullptr;
- static thread_local RRDDIM *MaxTrainingDurationRD = nullptr;
+ static thread_local RRDDIM *UserRD = nullptr;
+ static thread_local RRDDIM *SystemRD = nullptr;
if (!RS) {
std::string IdPrefix = "training_stats";
- std::string TitlePrefix = "Training step statistics for host";
+ std::string TitlePrefix = "Training thread CPU usage for host";
auto IdTitlePair = getHostSpecificIdAndTitle(RH, IdPrefix, TitlePrefix);
RS = rrdset_create_localhost(
@@ -238,24 +239,21 @@ static void updateTrainingChart(RRDHOST *RH,
"training_stats", // family
"anomaly_detection.training_stats", // ctx
IdTitlePair.second.c_str(), // title
- "milliseconds", // units
+ "milliseconds/s", // units
"netdata", // plugin
"ml", // module
39188, // priority
RH->rrd_update_every, // update_every
- RRDSET_TYPE_LINE // chart_type
+ RRDSET_TYPE_STACKED // chart_type
);
- TotalTrainingDurationRD = rrddim_add(RS, "total_training_duration", NULL,
- 1, 1, RRD_ALGORITHM_ABSOLUTE);
- MaxTrainingDurationRD = rrddim_add(RS, "max_training_duration", NULL,
- 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ UserRD = rrddim_add(RS, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
+ SystemRD = rrddim_add(RS, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
} else
rrdset_next(RS);
- rrddim_set_by_pointer(RS, TotalTrainingDurationRD, TotalTrainingDuration);
- rrddim_set_by_pointer(RS, MaxTrainingDurationRD, MaxTrainingDuration);
-
+ rrddim_set_by_pointer(RS, UserRD, TRU->ru_utime.tv_sec * 1000000ULL + TRU->ru_utime.tv_usec);
+ rrddim_set_by_pointer(RS, SystemRD, TRU->ru_stime.tv_sec * 1000000ULL + TRU->ru_stime.tv_usec);
rrdset_done(RS);
}
@@ -307,6 +305,7 @@ void RrdHost::getConfigAsJson(nlohmann::json &Json) const {
Json["smooth-n"] = Cfg.SmoothN;
Json["lag-n"] = Cfg.LagN;
+ Json["random-sampling-ratio"] = Cfg.RandomSamplingRatio;
Json["max-kmeans-iters"] = Cfg.MaxKMeansIters;
Json["dimension-anomaly-score-threshold"] = Cfg.DimensionAnomalyScoreThreshold;
@@ -345,11 +344,7 @@ void TrainableHost::trainDimension(Dimension *D, const TimePoint &NowTP) {
return;
D->LastTrainedAt = NowTP + Seconds{D->updateEvery()};
-
- TimePoint StartTP = SteadyClock::now();
D->trainModel();
- Duration<double> Duration = SteadyClock::now() - StartTP;
- D->updateTrainingDuration(Duration.count());
{
std::lock_guard<std::mutex> Lock(Mutex);
@@ -358,9 +353,11 @@ void TrainableHost::trainDimension(Dimension *D, const TimePoint &NowTP) {
}
void TrainableHost::train() {
- Duration<double> MaxSleepFor = Seconds{updateEvery()};
+ Duration<double> MaxSleepFor = Seconds{10 * updateEvery()};
while (!netdata_exit) {
+ updateResourceUsage();
+
TimePoint NowTP = SteadyClock::now();
auto P = findDimensionToTrain(NowTP);
@@ -393,9 +390,6 @@ void DetectableHost::detectOnce() {
size_t NumNormalDimensions = 0;
size_t NumTrainedDimensions = 0;
- double TotalTrainingDuration = 0.0;
- double MaxTrainingDuration = 0.0;
-
bool CollectAnomalyRates = (++AnomalyRateTimer == Cfg.DBEngineAnomalyRateEvery);
if (CollectAnomalyRates)
rrdset_next(AnomalyRateRS);
@@ -414,10 +408,6 @@ void DetectableHost::detectOnce() {
NumTrainedDimensions += D->isTrained();
- double DimTrainingDuration = D->updateTrainingDuration(0.0);
- MaxTrainingDuration = std::max(MaxTrainingDuration, DimTrainingDuration);
- TotalTrainingDuration += DimTrainingDuration;
-
if (IsAnomalous)
NumAnomalousDimensions += 1;
@@ -448,7 +438,10 @@ void DetectableHost::detectOnce() {
updateRateChart(getRH(), WindowAnomalyRate * 10000.0);
updateWindowLengthChart(getRH(), WindowLength);
updateEventsChart(getRH(), P, ResetBitCounter, NewAnomalyEvent);
- updateTrainingChart(getRH(), TotalTrainingDuration * 1000.0, MaxTrainingDuration * 1000.0);
+
+ struct rusage TRU;
+ getResourceUsage(&TRU);
+ updateTrainingChart(getRH(), &TRU);
if (!NewAnomalyEvent || (DimsOverThreshold.size() == 0))
return;
@@ -477,15 +470,15 @@ void DetectableHost::detectOnce() {
void DetectableHost::detect() {
std::this_thread::sleep_for(Seconds{10});
+ heartbeat_t HB;
+ heartbeat_init(&HB);
+
while (!netdata_exit) {
- TimePoint StartTP = SteadyClock::now();
- detectOnce();
- TimePoint EndTP = SteadyClock::now();
+ heartbeat_next(&HB, updateEvery() * USEC_PER_SEC);
- Duration<double> Dur = EndTP - StartTP;
- updateDetectionChart(getRH(), Dur.count() * 1000);
+ detectOnce();
- std::this_thread::sleep_for(Seconds{updateEvery()});
+ updateDetectionChart(getRH());
}
}