summaryrefslogtreecommitdiffstats
path: root/ml/Dimension.cc
diff options
context:
space:
mode:
authorvkalintiris <vasilis@netdata.cloud>2022-12-22 13:18:55 +0200
committerGitHub <noreply@github.com>2022-12-22 13:18:55 +0200
commit6f42311c4b32d42798f78de1fd43f53694f24e6e (patch)
treea48e85baea0d2feabdcddf1426a6a3c8c46c5568 /ml/Dimension.cc
parentc1aec98b30d8a4e80813cfccd636c31999c7ae3e (diff)
Revert "Refactor ML code and add support for multiple KMeans models. … (#14172)
Diffstat (limited to 'ml/Dimension.cc')
-rw-r--r--ml/Dimension.cc290
1 files changed, 59 insertions, 231 deletions
diff --git a/ml/Dimension.cc b/ml/Dimension.cc
index c2195f175d..bf34abb72f 100644
--- a/ml/Dimension.cc
+++ b/ml/Dimension.cc
@@ -3,174 +3,84 @@
#include "Config.h"
#include "Dimension.h"
#include "Query.h"
-#include "Host.h"
using namespace ml;
-static const char *mls2str(MachineLearningStatus MLS) {
- switch (MLS) {
- case ml::MachineLearningStatus::Enabled:
- return "enabled";
- case ml::MachineLearningStatus::DisabledDueToUniqueUpdateEvery:
- return "disabled-ue";
- case ml::MachineLearningStatus::DisabledDueToExcludedChart:
- return "disabled-sp";
- default:
- return "unknown";
- }
-}
-
-static const char *mt2str(MetricType MT) {
- switch (MT) {
- case ml::MetricType::Constant:
- return "constant";
- case ml::MetricType::Variable:
- return "variable";
- default:
- return "unknown";
- }
+bool Dimension::isActive() const {
+ bool SetObsolete = rrdset_flag_check(RD->rrdset, RRDSET_FLAG_OBSOLETE);
+ bool DimObsolete = rrddim_flag_check(RD, RRDDIM_FLAG_OBSOLETE);
+ return !SetObsolete && !DimObsolete;
}
-static const char *ts2str(TrainingStatus TS) {
- switch (TS) {
- case ml::TrainingStatus::PendingWithModel:
- return "pending-with-model";
- case ml::TrainingStatus::PendingWithoutModel:
- return "pending-without-model";
- case ml::TrainingStatus::Trained:
- return "trained";
- case ml::TrainingStatus::Untrained:
- return "untrained";
- default:
- return "unknown";
- }
-}
-
-static const char *tr2str(TrainingResult TR) {
- switch (TR) {
- case ml::TrainingResult::Ok:
- return "ok";
- case ml::TrainingResult::InvalidQueryTimeRange:
- return "invalid-query";
- case ml::TrainingResult::NotEnoughCollectedValues:
- return "missing-values";
- case ml::TrainingResult::NullAcquiredDimension:
- return "null-acquired-dim";
- case ml::TrainingResult::ChartUnderReplication:
- return "chart-under-replication";
- default:
- return "unknown";
- }
-}
-
-std::pair<CalculatedNumber *, TrainingResponse> Dimension::getCalculatedNumbers(const TrainingRequest &TrainingReq) {
- TrainingResponse TrainingResp = {};
-
- TrainingResp.RequestTime = TrainingReq.RequestTime;
- TrainingResp.FirstEntryOnRequest = TrainingReq.FirstEntryOnRequest;
- TrainingResp.LastEntryOnRequest = TrainingReq.LastEntryOnRequest;
-
- TrainingResp.FirstEntryOnResponse = rrddim_first_entry_t_of_tier(RD, 0);
- TrainingResp.LastEntryOnResponse = rrddim_last_entry_t_of_tier(RD, 0);
-
+std::pair<CalculatedNumber *, size_t> Dimension::getCalculatedNumbers() {
size_t MinN = Cfg.MinTrainSamples;
size_t MaxN = Cfg.MaxTrainSamples;
// Figure out what our time window should be.
- TrainingResp.QueryBeforeT = TrainingResp.LastEntryOnResponse;
- TrainingResp.QueryAfterT = std::max(
- TrainingResp.QueryBeforeT - static_cast<time_t>((MaxN - 1) * updateEvery()),
- TrainingResp.FirstEntryOnResponse
- );
-
- if (TrainingResp.QueryAfterT >= TrainingResp.QueryBeforeT) {
- TrainingResp.Result = TrainingResult::InvalidQueryTimeRange;
- return { nullptr, TrainingResp };
- }
+ time_t BeforeT = now_realtime_sec() - 1;
+ time_t AfterT = BeforeT - (MaxN * updateEvery());
- if (rrdset_is_replicating(RD->rrdset)) {
- TrainingResp.Result = TrainingResult::ChartUnderReplication;
- return { nullptr, TrainingResp };
- }
+ BeforeT -= (BeforeT % updateEvery());
+ AfterT -= (AfterT % updateEvery());
+
+ BeforeT = std::min(BeforeT, latestTime());
+ AfterT = std::max(AfterT, oldestTime());
+
+ if (AfterT >= BeforeT)
+ return { nullptr, 0 };
CalculatedNumber *CNs = new CalculatedNumber[MaxN * (Cfg.LagN + 1)]();
// Start the query.
- size_t Idx = 0;
+ unsigned Idx = 0;
+ unsigned CollectedValues = 0;
+ unsigned TotalValues = 0;
CalculatedNumber LastValue = std::numeric_limits<CalculatedNumber>::quiet_NaN();
Query Q = Query(getRD());
- Q.init(TrainingResp.QueryAfterT, TrainingResp.QueryBeforeT);
+ Q.init(AfterT, BeforeT);
while (!Q.isFinished()) {
if (Idx == MaxN)
break;
auto P = Q.nextMetric();
-
CalculatedNumber Value = P.second;
if (netdata_double_isnumber(Value)) {
- if (!TrainingResp.DbAfterT)
- TrainingResp.DbAfterT = P.first;
- TrainingResp.DbBeforeT = P.first;
-
CNs[Idx] = Value;
LastValue = CNs[Idx];
- TrainingResp.CollectedValues++;
+ CollectedValues++;
} else
CNs[Idx] = LastValue;
Idx++;
}
- TrainingResp.TotalValues = Idx;
-
- if (TrainingResp.CollectedValues < MinN) {
- TrainingResp.Result = TrainingResult::NotEnoughCollectedValues;
+ TotalValues = Idx;
+ if (CollectedValues < MinN) {
delete[] CNs;
- return { nullptr, TrainingResp };
+ return { nullptr, 0 };
}
// Find first non-NaN value.
- for (Idx = 0; std::isnan(CNs[Idx]); Idx++, TrainingResp.TotalValues--) { }
+ for (Idx = 0; std::isnan(CNs[Idx]); Idx++, TotalValues--) { }
// Overwrite NaN values.
if (Idx != 0)
- memmove(CNs, &CNs[Idx], sizeof(CalculatedNumber) * TrainingResp.TotalValues);
+ memmove(CNs, &CNs[Idx], sizeof(CalculatedNumber) * TotalValues);
- TrainingResp.Result = TrainingResult::Ok;
- return { CNs, TrainingResp };
+ return { CNs, TotalValues };
}
-TrainingResult Dimension::trainModel(const TrainingRequest &TrainingReq) {
- auto P = getCalculatedNumbers(TrainingReq);
+MLResult Dimension::trainModel() {
+ auto P = getCalculatedNumbers();
CalculatedNumber *CNs = P.first;
- TrainingResponse TrainingResp = P.second;
-
- if (TrainingResp.Result != TrainingResult::Ok) {
- std::lock_guard<std::mutex> Lock(Mutex);
+ unsigned N = P.second;
- MT = MetricType::Constant;
-
- switch (TS) {
- case TrainingStatus::PendingWithModel:
- TS = TrainingStatus::Trained;
- break;
- case TrainingStatus::PendingWithoutModel:
- TS = TrainingStatus::Untrained;
- break;
- default:
- break;
- }
+ if (!CNs)
+ return MLResult::MissingData;
- TR = TrainingResp;
-
- LastTrainingTime = TrainingResp.LastEntryOnResponse;
- return TrainingResp.Result;
- }
-
- unsigned N = TrainingResp.TotalValues;
unsigned TargetNumSamples = Cfg.MaxTrainSamples * Cfg.RandomSamplingRatio;
double SamplingRatio = std::min(static_cast<double>(TargetNumSamples) / N, 1.0);
@@ -183,81 +93,49 @@ TrainingResult Dimension::trainModel(const TrainingRequest &TrainingReq) {
{
std::lock_guard<std::mutex> Lock(Mutex);
-
- if (Models.size() < Cfg.NumModelsToUse) {
- Models.push_back(std::move(KM));
- } else {
- std::rotate(std::begin(Models), std::begin(Models) + 1, std::end(Models));
- Models[Models.size() - 1] = std::move(KM);
- }
-
- MT = MetricType::Constant;
- TS = TrainingStatus::Trained;
- TR = TrainingResp;
- LastTrainingTime = rrddim_last_entry_t(RD);
+ Models[0] = KM;
}
+ Trained = true;
+ ConstantModel = true;
+
delete[] CNs;
- return TrainingResp.Result;
+ return MLResult::Success;
}
-void Dimension::scheduleForTraining(time_t CurrT) {
- switch (MT) {
- case MetricType::Constant: {
- return;
- } default:
- break;
- }
+bool Dimension::shouldTrain(const TimePoint &TP) const {
+ if (ConstantModel)
+ return false;
- switch (TS) {
- case TrainingStatus::PendingWithModel:
- case TrainingStatus::PendingWithoutModel:
- break;
- case TrainingStatus::Untrained: {
- Host *H = reinterpret_cast<Host *>(RD->rrdset->rrdhost->ml_host);
- TS = TrainingStatus::PendingWithoutModel;
- H->scheduleForTraining(getTrainingRequest(CurrT));
- break;
- }
- case TrainingStatus::Trained: {
- bool NeedsTraining = LastTrainingTime + (Cfg.TrainEvery * updateEvery()) < CurrT;
-
- if (NeedsTraining) {
- Host *H = reinterpret_cast<Host *>(RD->rrdset->rrdhost->ml_host);
- TS = TrainingStatus::PendingWithModel;
- H->scheduleForTraining(getTrainingRequest(CurrT));
- }
- break;
- }
- }
+ return (LastTrainedAt + Seconds(Cfg.TrainEvery * updateEvery())) < TP;
}
-bool Dimension::predict(time_t CurrT, CalculatedNumber Value, bool Exists) {
- // Nothing to do if ML is disabled for this dimension
- if (MLS != MachineLearningStatus::Enabled)
- return false;
-
- // Don't treat values that don't exist as anomalous
+bool Dimension::predict(CalculatedNumber Value, bool Exists) {
if (!Exists) {
CNs.clear();
+ AnomalyBit = false;
return false;
}
- // Save the value and return if we don't have enough values for a sample
unsigned N = Cfg.DiffN + Cfg.SmoothN + Cfg.LagN;
if (CNs.size() < N) {
CNs.push_back(Value);
+ AnomalyBit = false;
return false;
}
- // Push the value and check if it's different from the last one
- bool SameValue = true;
std::rotate(std::begin(CNs), std::begin(CNs) + 1, std::end(CNs));
+
if (CNs[N - 1] != Value)
- SameValue = false;
+ ConstantModel = false;
+
CNs[N - 1] = Value;
- // Create the sample
+ if (!isTrained() || ConstantModel) {
+ AnomalyBit = false;
+ return false;
+ }
+
CalculatedNumber *TmpCNs = new CalculatedNumber[N * (Cfg.LagN + 1)]();
std::memcpy(TmpCNs, CNs.data(), N * sizeof(CalculatedNumber));
SamplesBuffer SB = SamplesBuffer(TmpCNs, N, 1,
@@ -266,80 +144,30 @@ bool Dimension::predict(time_t CurrT, CalculatedNumber Value, bool Exists) {
const DSample Sample = SB.preprocess().back();
delete[] TmpCNs;
- /*
- * Lock to predict and possibly schedule the dimension for training
- */
-
std::unique_lock<std::mutex> Lock(Mutex, std::defer_lock);
if (!Lock.try_lock()) {
+ AnomalyBit = false;
return false;
}
- // Mark the metric time as variable if we received different values
- if (!SameValue)
- MT = MetricType::Variable;
-
- // Decide if the dimension needs to be scheduled for training
- scheduleForTraining(CurrT);
-
- // Nothing to do if we don't have a model
- switch (TS) {
- case TrainingStatus::Untrained:
- case TrainingStatus::PendingWithoutModel:
- return false;
- default:
- break;
- }
-
- /*
- * Use the KMeans models to check if the value is anomalous
- */
-
- size_t ModelsConsulted = 0;
- size_t Sum = 0;
-
for (const auto &KM : Models) {
- ModelsConsulted++;
-
double AnomalyScore = KM.anomalyScore(Sample);
- if (AnomalyScore == std::numeric_limits<CalculatedNumber>::quiet_NaN())
+ if (AnomalyScore == std::numeric_limits<CalculatedNumber>::quiet_NaN()) {
+ AnomalyBit = false;
continue;
+ }
if (AnomalyScore < (100 * Cfg.DimensionAnomalyScoreThreshold)) {
- global_statistics_ml_models_consulted(ModelsConsulted);
+ AnomalyBit = false;
return false;
}
-
- Sum += 1;
}
- global_statistics_ml_models_consulted(ModelsConsulted);
- return Sum;
+ AnomalyBit = true;
+ return true;
}
-std::vector<KMeans> Dimension::getModels() {
+std::array<KMeans, 1> Dimension::getModels() {
std::unique_lock<std::mutex> Lock(Mutex);
return Models;
}
-
-void Dimension::dump() const {
- const char *ChartId = rrdset_id(RD->rrdset);
- const char *DimensionId = rrddim_id(RD);
-
- const char *MLS_Str = mls2str(MLS);
- const char *MT_Str = mt2str(MT);
- const char *TS_Str = ts2str(TS);
- const char *TR_Str = tr2str(TR.Result);
-
- const char *fmt =
- "[ML] %s.%s: MLS=%s, MT=%s, TS=%s, Result=%s, "
- "ReqTime=%ld, FEOReq=%ld, LEOReq=%ld, "
- "FEOResp=%ld, LEOResp=%ld, QTR=<%ld, %ld>, DBTR=<%ld, %ld>, Collected=%zu, Total=%zu";
-
- error(fmt,
- ChartId, DimensionId, MLS_Str, MT_Str, TS_Str, TR_Str,
- TR.RequestTime, TR.FirstEntryOnRequest, TR.LastEntryOnRequest,
- TR.FirstEntryOnResponse, TR.LastEntryOnResponse,
- TR.QueryAfterT, TR.QueryBeforeT, TR.DbAfterT, TR.DbBeforeT, TR.CollectedValues, TR.TotalValues
- );
-}