summaryrefslogtreecommitdiffstats
path: root/ml
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-01-10 19:59:21 +0200
committerGitHub <noreply@github.com>2023-01-10 19:59:21 +0200
commit368a26cfee6887ca0cb2301d93138f63b75e353a (patch)
treeb57e39fdb78dc57f7a2c1fcc3d9b6bf3c2a2a113 /ml
parentb513888be389f92b2323d1bb3fdf55c22d4e4bad (diff)
DBENGINE v2 (#14125)
* count open cache pages refering to datafile * eliminate waste flush attempts * remove eliminated variable * journal v2 scanning split functions * avoid locking open cache for a long time while migrating to journal v2 * dont acquire datafile for the loop; disable thread cancelability while a query is running * work on datafile acquiring * work on datafile deletion * work on datafile deletion again * logs of dbengine should start with DBENGINE * thread specific key for queries to check if a query finishes without a finalize * page_uuid is not used anymore * Cleanup judy traversal when building new v2 Remove not needed calls to metric registry * metric is 8 bytes smaller; timestamps are protected with a spinlock; timestamps in metric are now always coherent * disable checks for invalid time-ranges * Remove type from page details * report scanning time * remove infinite loop from datafile acquire for deletion * remove infinite loop from datafile acquire for deletion again * trace query handles * properly allocate array of dimensions in replication * metrics cleanup * metrics registry uses arrayalloc * arrayalloc free should be protected by lock * use array alloc in page cache * journal v2 scanning fix * datafile reference leaking hunding * do not load metrics of future timestamps * initialize reasons * fix datafile reference leak * do not load pages that are entirely overlapped by others * expand metric retention atomically * split replication logic in initialization and execution * replication prepare ahead queries * replication prepare ahead queries fixed * fix replication workers accounting * add router active queries chart * restore accounting of pages metadata sources; cleanup replication * dont count skipped pages as unroutable * notes on services shutdown * do not migrate to journal v2 too early, while it has pending dirty pages in the main cache for the specific journal file * do not add pages we dont need to pdc * time in range re-work to provide info about past and future matches * finner control on the pages selected for processing; accounting of page related issues * fix invalid reference to handle->page * eliminate data collection handle of pg_lookup_next * accounting for queries with gaps * query preprocessing the same way the processing is done; cache now supports all operations on Judy * dynamic libuv workers based on number of processors; minimum libuv workers 8; replication query init ahead uses libuv workers - reserved ones (3) * get into pdc all matching pages from main cache and open cache; do not do v2 scan if main cache and open cache can satisfy the query * finner gaps calculation; accounting of overlapping pages in queries * fix gaps accounting * move datafile deletion to worker thread * tune libuv workers and thread stack size * stop netdata threads gradually * run indexing together with cache flush/evict * more work on clean shutdown * limit the number of pages to evict per run * do not lock the clean queue for accesses if it is not possible at that time - the page will be moved to the back of the list during eviction * economies on flags for smaller page footprint; cleanup and renames * eviction moves referenced pages to the end of the queue * use murmur hash for indexing partition * murmur should be static * use more indexing partitions * revert number of partitions to number of cpus * cancel threads first, then stop services * revert default thread stack size * dont execute replication requests of disconnected senders * wait more time for services that are exiting gradually * fixed last commit * finer control on page selection algorithm * default stacksize of 1MB * fix formatting * fix worker utilization going crazy when the number is rotating * avoid buffer full due to replication preprocessing of requests * support query priorities * add count of spins in spinlock when compiled with netdata internal checks * remove prioritization from dbengine queries; cache now uses mutexes for the queues * hot pages are now in sections judy arrays, like dirty * align replication queries to optimal page size * during flushing add to clean and evict in batches * Revert "during flushing add to clean and evict in batches" This reverts commit 8fb2b69d068499eacea6de8291c336e5e9f197c7. * dont lock clean while evicting pages during flushing * Revert "dont lock clean while evicting pages during flushing" This reverts commit d6c82b5f40aeba86fc7aead062fab1b819ba58b3. * Revert "Revert "during flushing add to clean and evict in batches"" This reverts commit ca7a187537fb8f743992700427e13042561211ec. * dont cross locks during flushing, for the fastest flushes possible * low-priority queries load pages synchronously * Revert "low-priority queries load pages synchronously" This reverts commit 1ef2662ddcd20fe5842b856c716df134c42d1dc7. * cache uses spinlock again * during flushing, dont lock the clean queue at all; each item is added atomically * do smaller eviction runs * evict one page at a time to minimize lock contention on the clean queue * fix eviction statistics * fix last commit * plain should be main cache * event loop cleanup; evictions and flushes can now happen concurrently * run flush and evictions from tier0 only * remove not needed variables * flushing open cache is not needed; flushing protection is irrelevant since flushing is global for all tiers; added protection to datafiles so that only one flusher can run per datafile at any given time * added worker jobs in timer to find the slow part of it * support fast eviction of pages when all_of_them is set * revert default thread stack size * bypass event loop for dispatching read extent commands to workers - send them directly * Revert "bypass event loop for dispatching read extent commands to workers - send them directly" This reverts commit 2c08bc5bab12881ae33bc73ce5dea03dfc4e1fce. * cache work requests * minimize memory operations during flushing; caching of extent_io_descriptors and page_descriptors * publish flushed pages to open cache in the thread pool * prevent eventloop requests from getting stacked in the event loop * single threaded dbengine controller; support priorities for all queries; major cleanup and restructuring of rrdengine.c * more rrdengine.c cleanup * enable db rotation * do not log when there is a filter * do not run multiple migration to journal v2 * load all extents async * fix wrong paste * report opcodes waiting, works dispatched, works executing * cleanup event loop memory every 10 minutes * dont dispatch more work requests than the number of threads available * use the dispatched counter instead of the executing counter to check if the worker thread pool is full * remove UV_RUN_NOWAIT * replication to fill the queues * caching of extent buffers; code cleanup * caching of pdc and pd; rework on journal v2 indexing, datafile creation, database rotation * single transaction wal * synchronous flushing * first cancel the threads, then signal them to exit * caching of rrdeng query handles; added priority to query target; health is now low prio * add priority to the missing points; do not allow critical priority in queries * offload query preparation and routing to libuv thread pool * updated timing charts for the offloaded query preparation * caching of WALs * accounting for struct caches (buffers); do not load extents with invalid sizes * protection against memory booming during replication due to the optimal alignment of pages; sender thread buffer is now also reset when the circular buffer is reset * also check if the expanded before is not the chart later updated time * also check if the expanded before is not after the wall clock time of when the query started * Remove unused variable * replication to queue less queries; cleanup of internal fatals * Mark dimension to be updated async * caching of extent_page_details_list (epdl) and datafile_extent_offset_list (deol) * disable pgc stress test, under an ifdef * disable mrg stress test under an ifdef * Mark chart and host labels, host info for async check and store in the database * dictionary items use arrayalloc * cache section pages structure is allocated with arrayalloc * Add function to wakeup the aclk query threads and check for exit Register function to be called during shutdown after signaling the service to exit * parallel preparation of all dimensions of queries * be more sensitive to enable streaming after replication * atomically finish chart replication * fix last commit * fix last commit again * fix last commit again again * fix last commit again again again * unify the normalization of retention calculation for collected charts; do not enable streaming if more than 60 points are to be transferred; eliminate an allocation during replication * do not cancel start streaming; use high priority queries when we have locked chart data collection * prevent starvation on opcodes execution, by allowing 2% of the requests to be re-ordered * opcode now uses 2 spinlocks one for the caching of allocations and one for the waiting queue * Remove check locks and NETDATA_VERIFY_LOCKS as it is not needed anymore * Fix bad memory allocation / cleanup * Cleanup ACLK sync initialization (part 1) * Don't update metric registry during shutdown (part 1) * Prevent crash when dashboard is refreshed and host goes away * Mark ctx that is shutting down. Test not adding flushed pages to open cache as hot if we are shutting down * make ML work * Fix compile without NETDATA_INTERNAL_CHECKS * shutdown each ctx independently * fix completion of quiesce * do not update shared ML charts * Create ML charts on child hosts. When a parent runs a ML for a child, the relevant-ML charts should be created on the child host. These charts should use the parent's hostname to differentiate multiple parents that might run ML for a child. The only exception to this rule is the training/prediction resource usage charts. These are created on the localhost of the parent host, because they provide information specific to said host. * check new ml code * first save the database, then free all memory * dbengine prep exit before freeing all memory; fixed deadlock in cache hot to dirty; added missing check to query engine about metrics without any data in the db * Cleanup metadata thread (part 2) * increase refcount before dispatching prep command * Do not try to stop anomaly detection threads twice. A separate function call has been added to stop anomaly detection threads. This commit removes the left over function calls that were made internally when a host was being created/destroyed. * Remove allocations when smoothing samples buffer The number of dims per sample is always 1, ie. we are training and predicting only individual dimensions. * set the orphan flag when loading archived hosts * track worker dispatch callbacks and threadpool worker init * make ML threads joinable; mark ctx having flushing in progress as early as possible * fix allocation counter * Cleanup metadata thread (part 3) * Cleanup metadata thread (part 4) * Skip metadata host scan when running unittest * unittest support during init * dont use all the libuv threads for queries * break an infinite loop when sleep_usec() is interrupted * ml prediction is a collector for several charts * sleep_usec() now makes sure it will never loop if it passes the time expected; sleep_usec() now uses nanosleep() because clock_nanosleep() misses signals on netdata exit * worker_unregister() in netdata threads cleanup * moved pdc/epdl/deol/extent_buffer related code to pdc.c and pdc.h * fixed ML issues * removed engine2 directory * added dbengine2 files in CMakeLists.txt * move query plan data to query target, so that they can be exposed by in jsonwrap * uniform definition of query plan according to the other query target members * event_loop should be in daemon, not libnetdata * metric_retention_by_uuid() is now part of the storage engine abstraction * unify time_t variables to have the suffix _s (meaning: seconds) * old dbengine statistics become "dbengine io" * do not enable ML resource usage charts by default * unify ml chart families, plugins and modules * cleanup query plans from query target * cleanup all extent buffers * added debug info for rrddim slot to time * rrddim now does proper gap management * full rewrite of the mem modes * use library functions for madvise * use CHECKSUM_SZ for the checksum size * fix coverity warning about the impossible case of returning a page that is entirely in the past of the query * fix dbengine shutdown * keep the old datafile lock until a new datafile has been created, to avoid creating multiple datafiles concurrently * fine tune cache evictions * dont initialize health if the health service is not running - prevent crash on shutdown while children get connected * rename AS threads to ACLK[hostname] * prevent re-use of uninitialized memory in queries * use JulyL instead of JudyL for PDC operations - to test it first * add also JulyL files * fix July memory accounting * disable July for PDC (use Judy) * use the function to remove datafiles from linked list * fix july and event_loop * add july to libnetdata subdirs * rename time_t variables that end in _t to end in _s * replicate when there is a gap at the beginning of the replication period * reset postponing of sender connections when a receiver is connected * Adjust update every properly * fix replication infinite loop due to last change * packed enums in rrd.h and cleanup of obsolete rrd structure members * prevent deadlock in replication: replication_recalculate_buffer_used_ratio_unsafe() deadlocking with replication_sender_delete_pending_requests() * void unused variable * void unused variables * fix indentation * entries_by_time calculation in VD was wrong; restored internal checks for checking future timestamps * macros to caclulate page entries by time and size * prevent statsd cleanup crash on exit * cleanup health thread related variables Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com> Co-authored-by: vkalintiris <vasilis@netdata.cloud>
Diffstat (limited to 'ml')
-rw-r--r--ml/ADCharts.cc123
-rw-r--r--ml/Dimension.cc6
-rw-r--r--ml/Dimension.h10
-rw-r--r--ml/Host.cc62
-rw-r--r--ml/Query.h8
-rw-r--r--ml/SamplesBuffer.cc11
-rw-r--r--ml/SamplesBuffer.h4
-rw-r--r--ml/ml.cc4
8 files changed, 133 insertions, 95 deletions
diff --git a/ml/ADCharts.cc b/ml/ADCharts.cc
index d77b36612c..cbb13f5d19 100644
--- a/ml/ADCharts.cc
+++ b/ml/ADCharts.cc
@@ -21,16 +21,16 @@ void ml::updateDimensionsChart(RRDHOST *RH, const MachineLearningStats &MLS) {
NameSS << "machine_learning_status_on_" << rrdhost_hostname(localhost);
MachineLearningStatusRS = rrdset_create(
- RH,
- "netdata", // type
+ RH,
+ "netdata", // type
IdSS.str().c_str(), // id
NameSS.str().c_str(), // name
- "ml", // family
+ NETDATA_ML_CHART_FAMILY, // family
"netdata.machine_learning_status", // ctx
"Machine learning status", // title
"dimensions", // units
- "netdata", // plugin
- "ml", // module
+ NETDATA_ML_PLUGIN, // plugin
+ NETDATA_ML_MODULE_TRAINING, // module
NETDATA_ML_CHART_PRIO_MACHINE_LEARNING_STATUS, // priority
RH->rrd_update_every, // update_every
RRDSET_TYPE_LINE // chart_type
@@ -65,16 +65,16 @@ void ml::updateDimensionsChart(RRDHOST *RH, const MachineLearningStats &MLS) {
NameSS << "metric_types_on_" << rrdhost_hostname(localhost);
MetricTypesRS = rrdset_create(
- RH,
- "netdata", // type
+ RH,
+ "netdata", // type
IdSS.str().c_str(), // id
NameSS.str().c_str(), // name
- "ml", // family
+ NETDATA_ML_CHART_FAMILY, // family
"netdata.metric_types", // ctx
"Dimensions by metric type", // title
"dimensions", // units
- "netdata", // plugin
- "ml", // module
+ NETDATA_ML_PLUGIN, // plugin
+ NETDATA_ML_MODULE_TRAINING, // module
NETDATA_ML_CHART_PRIO_METRIC_TYPES, // priority
RH->rrd_update_every, // update_every
RRDSET_TYPE_LINE // chart_type
@@ -109,16 +109,16 @@ void ml::updateDimensionsChart(RRDHOST *RH, const MachineLearningStats &MLS) {
NameSS << "training_status_on_" << rrdhost_hostname(localhost);
TrainingStatusRS = rrdset_create(
- RH,
- "netdata", // type
+ RH,
+ "netdata", // type
IdSS.str().c_str(), // id
NameSS.str().c_str(), // name
- "ml", // family
+ NETDATA_ML_CHART_FAMILY, // family
"netdata.training_status", // ctx
"Training status of dimensions", // title
"dimensions", // units
- "netdata", // plugin
- "ml", // module
+ NETDATA_ML_PLUGIN, // plugin
+ NETDATA_ML_MODULE_TRAINING, // module
NETDATA_ML_CHART_PRIO_TRAINING_STATUS, // priority
RH->rrd_update_every, // update_every
RRDSET_TYPE_LINE // chart_type
@@ -156,16 +156,16 @@ void ml::updateDimensionsChart(RRDHOST *RH, const MachineLearningStats &MLS) {
NameSS << "dimensions_on_" << rrdhost_hostname(localhost);
PredictionRS = rrdset_create(
- RH,
- "anomaly_detection", // type
+ RH,
+ "anomaly_detection", // type
IdSS.str().c_str(), // id
NameSS.str().c_str(), // name
"dimensions", // family
"anomaly_detection.dimensions", // ctx
"Anomaly detection dimensions", // title
"dimensions", // units
- "netdata", // plugin
- "ml", // module
+ NETDATA_ML_PLUGIN, // plugin
+ NETDATA_ML_MODULE_TRAINING, // module
ML_CHART_PRIO_DIMENSIONS, // priority
RH->rrd_update_every, // update_every
RRDSET_TYPE_LINE // chart_type
@@ -195,16 +195,16 @@ void ml::updateHostAndDetectionRateCharts(RRDHOST *RH, collected_number AnomalyR
NameSS << "anomaly_rate_on_" << rrdhost_hostname(localhost);
HostRateRS = rrdset_create(
- RH,
- "anomaly_detection", // type
+ RH,
+ "anomaly_detection", // type
IdSS.str().c_str(), // id
NameSS.str().c_str(), // name
"anomaly_rate", // family
"anomaly_detection.anomaly_rate", // ctx
"Percentage of anomalous dimensions", // title
"percentage", // units
- "netdata", // plugin
- "ml", // module
+ NETDATA_ML_PLUGIN, // plugin
+ NETDATA_ML_MODULE_DETECTION, // module
ML_CHART_PRIO_ANOMALY_RATE, // priority
RH->rrd_update_every, // update_every
RRDSET_TYPE_LINE // chart_type
@@ -229,16 +229,16 @@ void ml::updateHostAndDetectionRateCharts(RRDHOST *RH, collected_number AnomalyR
NameSS << "anomaly_detection_on_" << rrdhost_hostname(localhost);
AnomalyDetectionRS = rrdset_create(
- RH,
- "anomaly_detection", // type
+ RH,
+ "anomaly_detection", // type
IdSS.str().c_str(), // id
NameSS.str().c_str(), // name
"anomaly_detection", // family
"anomaly_detection.detector_events", // ctx
"Anomaly detection events", // title
"percentage", // units
- "netdata", // plugin
- "ml", // module
+ NETDATA_ML_PLUGIN, // plugin
+ NETDATA_ML_MODULE_DETECTION, // module
ML_CHART_PRIO_DETECTOR_EVENTS, // priority
RH->rrd_update_every, // update_every
RRDSET_TYPE_LINE // chart_type
@@ -271,20 +271,21 @@ void ml::updateHostAndDetectionRateCharts(RRDHOST *RH, collected_number AnomalyR
NULL /* group options */,
0, /* timeout */
0, /* tier */
- QUERY_SOURCE_ML
+ QUERY_SOURCE_ML,
+ STORAGE_PRIORITY_BEST_EFFORT
);
if(R) {
- assert(R->d == 1 && R->n == 1 && R->rows == 1);
-
- static thread_local bool PrevAboveThreshold = false;
- bool AboveThreshold = R->v[0] >= Cfg.HostAnomalyRateThreshold;
- bool NewAnomalyEvent = AboveThreshold && !PrevAboveThreshold;
- PrevAboveThreshold = AboveThreshold;
-
- rrddim_set_by_pointer(AnomalyDetectionRS, AboveThresholdRD, AboveThreshold);
- rrddim_set_by_pointer(AnomalyDetectionRS, NewAnomalyEventRD, NewAnomalyEvent);
- rrdset_done(AnomalyDetectionRS);
+ if(R->d == 1 && R->n == 1 && R->rows == 1) {
+ static thread_local bool PrevAboveThreshold = false;
+ bool AboveThreshold = R->v[0] >= Cfg.HostAnomalyRateThreshold;
+ bool NewAnomalyEvent = AboveThreshold && !PrevAboveThreshold;
+ PrevAboveThreshold = AboveThreshold;
+
+ rrddim_set_by_pointer(AnomalyDetectionRS, AboveThresholdRD, AboveThreshold);
+ rrddim_set_by_pointer(AnomalyDetectionRS, NewAnomalyEventRD, NewAnomalyEvent);
+ rrdset_done(AnomalyDetectionRS);
+ }
rrdr_free(OWA, R);
}
@@ -309,15 +310,15 @@ void ml::updateResourceUsageCharts(RRDHOST *RH, const struct rusage &PredictionR
NameSS << "prediction_usage_for_" << rrdhost_hostname(RH);
RS = rrdset_create_localhost(
- "netdata", // type
+ "netdata", // type
IdSS.str().c_str(), // id
NameSS.str().c_str(), // name
- "ml", // family
+ NETDATA_ML_CHART_FAMILY, // family
"netdata.prediction_usage", // ctx
"Prediction resource usage", // title
"milliseconds/s", // units
- "netdata", // plugin
- "ml", // module
+ NETDATA_ML_PLUGIN, // plugin
+ NETDATA_ML_MODULE_PREDICTION, // module
NETDATA_ML_CHART_PRIO_PREDICTION_USAGE, // priority
RH->rrd_update_every, // update_every
RRDSET_TYPE_STACKED // chart_type
@@ -350,15 +351,15 @@ void ml::updateResourceUsageCharts(RRDHOST *RH, const struct rusage &PredictionR
NameSS << "training_usage_for_" << rrdhost_hostname(RH);
RS = rrdset_create_localhost(
- "netdata", // type
+ "netdata", // type
IdSS.str().c_str(), // id
NameSS.str().c_str(), // name
- "ml", // family
+ NETDATA_ML_CHART_FAMILY, // family
"netdata.training_usage", // ctx
"Training resource usage", // title
"milliseconds/s", // units
- "netdata", // plugin
- "ml", // module
+ NETDATA_ML_PLUGIN, // plugin
+ NETDATA_ML_MODULE_TRAINING, // module
NETDATA_ML_CHART_PRIO_TRAINING_USAGE, // priority
RH->rrd_update_every, // update_every
RRDSET_TYPE_STACKED // chart_type
@@ -393,16 +394,16 @@ void ml::updateTrainingStatisticsChart(RRDHOST *RH, const TrainingStats &TS) {
NameSS << "queue_stats_on_" << rrdhost_hostname(localhost);
RS = rrdset_create(
- RH,
- "netdata", // type
+ RH,
+ "netdata", // type
IdSS.str().c_str(), // id
NameSS.str().c_str(), // name
- "ml", // family
+ NETDATA_ML_CHART_FAMILY, // family
"netdata.queue_stats", // ctx
"Training queue stats", // title
"items", // units
- "netdata", // plugin
- "ml", // module
+ NETDATA_ML_PLUGIN, // plugin
+ NETDATA_ML_MODULE_TRAINING, // module
NETDATA_ML_CHART_PRIO_QUEUE_STATS, // priority
RH->rrd_update_every, // update_every
RRDSET_TYPE_LINE// chart_type
@@ -436,16 +437,16 @@ void ml::updateTrainingStatisticsChart(RRDHOST *RH, const TrainingStats &TS) {
NameSS << "training_time_stats_on_" << rrdhost_hostname(localhost);
RS = rrdset_create(
- RH,
- "netdata", // type
+ RH,
+ "netdata", // type
IdSS.str().c_str(), // id
NameSS.str().c_str(), // name
- "ml", // family
+ NETDATA_ML_CHART_FAMILY, // family
"netdata.training_time_stats", // ctx
"Training time stats", // title
"milliseconds", // units
- "netdata", // plugin
- "ml", // module
+ NETDATA_ML_PLUGIN, // plugin
+ NETDATA_ML_MODULE_TRAINING, // module
NETDATA_ML_CHART_PRIO_TRAINING_TIME_STATS, // priority
RH->rrd_update_every, // update_every
RRDSET_TYPE_LINE// chart_type
@@ -483,16 +484,16 @@ void ml::updateTrainingStatisticsChart(RRDHOST *RH, const TrainingStats &TS) {
NameSS << "training_results_on_" << rrdhost_hostname(localhost);
RS = rrdset_create(
- RH,
- "netdata", // type
+ RH,
+ "netdata", // type
IdSS.str().c_str(), // id
NameSS.str().c_str(), // name
- "ml", // family
+ NETDATA_ML_CHART_FAMILY, // family
"netdata.training_results", // ctx
"Training results", // title
"events", // units
- "netdata", // plugin
- "ml", // module
+ NETDATA_ML_PLUGIN, // plugin
+ NETDATA_ML_MODULE_TRAINING, // module
NETDATA_ML_CHART_PRIO_TRAINING_RESULTS, // priority
RH->rrd_update_every, // update_every
RRDSET_TYPE_LINE// chart_type
diff --git a/ml/Dimension.cc b/ml/Dimension.cc
index ef324a0736..62699aaad7 100644
--- a/ml/Dimension.cc
+++ b/ml/Dimension.cc
@@ -70,8 +70,8 @@ std::pair<CalculatedNumber *, TrainingResponse> Dimension::getCalculatedNumbers(
TrainingResp.FirstEntryOnRequest = TrainingReq.FirstEntryOnRequest;
TrainingResp.LastEntryOnRequest = TrainingReq.LastEntryOnRequest;
- TrainingResp.FirstEntryOnResponse = rrddim_first_entry_t_of_tier(RD, 0);
- TrainingResp.LastEntryOnResponse = rrddim_last_entry_t_of_tier(RD, 0);
+ TrainingResp.FirstEntryOnResponse = rrddim_first_entry_s_of_tier(RD, 0);
+ TrainingResp.LastEntryOnResponse = rrddim_last_entry_s_of_tier(RD, 0);
size_t MinN = Cfg.MinTrainSamples;
size_t MaxN = Cfg.MaxTrainSamples;
@@ -195,7 +195,7 @@ TrainingResult Dimension::trainModel(const TrainingRequest &TrainingReq) {
MT = MetricType::Constant;
TS = TrainingStatus::Trained;
TR = TrainingResp;
- LastTrainingTime = rrddim_last_entry_t(RD);
+ LastTrainingTime = rrddim_last_entry_s(RD);
}
delete[] CNs;
diff --git a/ml/Dimension.h b/ml/Dimension.h
index 4d06c9372d..2b1adfff9e 100644
--- a/ml/Dimension.h
+++ b/ml/Dimension.h
@@ -166,11 +166,11 @@ public:
private:
TrainingRequest getTrainingRequest(time_t CurrT) const {
return TrainingRequest {
- string_dup(RD->rrdset->id),
- string_dup(RD->id),
- CurrT,
- rrddim_first_entry_t(RD),
- rrddim_last_entry_t(RD)
+ string_dup(RD->rrdset->id),
+ string_dup(RD->id),
+ CurrT,
+ rrddim_first_entry_s(RD),
+ rrddim_last_entry_s(RD)
};
}
diff --git a/ml/Host.cc b/ml/Host.cc
index fcf97b2b8d..5980f610d8 100644
--- a/ml/Host.cc
+++ b/ml/Host.cc
@@ -54,7 +54,15 @@ void Host::getModelsAsJson(nlohmann::json &Json) {
}
}
+#define WORKER_JOB_DETECTION_PREP 0
+#define WORKER_JOB_DETECTION_DIM_CHART 1
+#define WORKER_JOB_DETECTION_HOST_CHART 2
+#define WORKER_JOB_DETECTION_STATS 3
+#define WORKER_JOB_DETECTION_RESOURCES 4
+
void Host::detectOnce() {
+ worker_is_busy(WORKER_JOB_DETECTION_PREP);
+
MLS = {};
MachineLearningStats MLSCopy = {};
TrainingStats TSCopy = {};
@@ -134,13 +142,20 @@ void Host::detectOnce() {
TSCopy.RemainingUT = 0;
}
+ worker_is_busy(WORKER_JOB_DETECTION_DIM_CHART);
updateDimensionsChart(RH, MLSCopy);
+
+ worker_is_busy(WORKER_JOB_DETECTION_HOST_CHART);
updateHostAndDetectionRateCharts(RH, HostAnomalyRate * 10000.0);
+#ifdef NETDATA_ML_RESOURCE_CHARTS
+ worker_is_busy(WORKER_JOB_DETECTION_RESOURCES);
struct rusage PredictionRU;
getrusage(RUSAGE_THREAD, &PredictionRU);
updateResourceUsageCharts(RH, PredictionRU, TSCopy.TrainingRU);
+#endif
+ worker_is_busy(WORKER_JOB_DETECTION_STATS);
updateTrainingStatisticsChart(RH, TSCopy);
}
@@ -150,7 +165,6 @@ public:
RRDDIM_ACQUIRED *AcqRD = nullptr;
Dimension *D = nullptr;
- rrdhost_rdlock(RH);
RRDSET *RS = rrdset_find(RH, string2str(ChartId));
if (RS) {
AcqRD = rrddim_find_and_acquire(RS, string2str(DimensionId));
@@ -160,7 +174,6 @@ public:
D = reinterpret_cast<Dimension *>(RD->ml_dimension);
}
}
- rrdhost_unlock(RH);
return AcquiredDimension(AcqRD, D);
}
@@ -190,8 +203,19 @@ void Host::scheduleForTraining(TrainingRequest TR) {
TrainingQueue.push(TR);
}
+#define WORKER_JOB_TRAINING_FIND 0
+#define WORKER_JOB_TRAINING_TRAIN 1
+#define WORKER_JOB_TRAINING_STATS 2
+
void Host::train() {
- while (!netdata_exit) {
+ worker_register("MLTRAIN");
+ worker_register_job_name(WORKER_JOB_TRAINING_FIND, "find");
+ worker_register_job_name(WORKER_JOB_TRAINING_TRAIN, "train");
+ worker_register_job_name(WORKER_JOB_TRAINING_STATS, "stats");
+
+ service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, (force_quit_t )ml_stop_anomaly_detection_threads, RH, true);
+
+ while (service_running(SERVICE_ML_TRAINING)) {
auto P = TrainingQueue.pop();
TrainingRequest TrainingReq = P.first;
size_t Size = P.second;
@@ -200,15 +224,21 @@ void Host::train() {
if (AllottedUT > USEC_PER_SEC)
AllottedUT = USEC_PER_SEC;
- usec_t StartUT = now_realtime_usec();
+ usec_t StartUT = now_monotonic_usec();
TrainingResult TrainingRes;
{
+ worker_is_busy(WORKER_JOB_TRAINING_FIND);
AcquiredDimension AcqDim = AcquiredDimension::find(RH, TrainingReq.ChartId, TrainingReq.DimensionId);
+
+ worker_is_busy(WORKER_JOB_TRAINING_TRAIN);
TrainingRes = AcqDim.train(TrainingReq);
+
string_freez(TrainingReq.ChartId);
string_freez(TrainingReq.DimensionId);
}
- usec_t ConsumedUT = now_realtime_usec() - StartUT;
+ usec_t ConsumedUT = now_monotonic_usec() - StartUT;
+
+ worker_is_busy(WORKER_JOB_TRAINING_STATS);
usec_t RemainingUT = 0;
if (ConsumedUT < AllottedUT)
@@ -249,15 +279,27 @@ void Host::train() {
}
}
+ worker_is_idle();
std::this_thread::sleep_for(std::chrono::microseconds{RemainingUT});
+ worker_is_busy(0);
}
}
void Host::detect() {
+ worker_register("MLDETECT");
+ worker_register_job_name(WORKER_JOB_DETECTION_PREP, "prep");
+ worker_register_job_name(WORKER_JOB_DETECTION_DIM_CHART, "dim chart");
+ worker_register_job_name(WORKER_JOB_DETECTION_HOST_CHART, "host chart");
+ worker_register_job_name(WORKER_JOB_DETECTION_STATS, "stats");
+ worker_register_job_name(WORKER_JOB_DETECTION_RESOURCES, "resources");
+
+ service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, (force_quit_t )ml_stop_anomaly_detection_threads, RH, true);
+
heartbeat_t HB;
heartbeat_init(&HB);
- while (!netdata_exit) {
+ while (service_running((SERVICE_TYPE)(SERVICE_ML_PREDICTION | SERVICE_COLLECTORS))) {
+ worker_is_idle();
heartbeat_next(&HB, RH->rrd_update_every * USEC_PER_SEC);
detectOnce();
}
@@ -294,10 +336,10 @@ void Host::startAnomalyDetectionThreads() {
char Tag[NETDATA_THREAD_TAG_MAX + 1];
snprintfz(Tag, NETDATA_THREAD_TAG_MAX, "TRAIN[%s]", rrdhost_hostname(RH));
- netdata_thread_create(&TrainingThread, Tag, NETDATA_THREAD_OPTION_JOINABLE, train_main, static_cast<void *>(this));
+ netdata_thread_create(&TrainingThread, Tag, NETDATA_THREAD_OPTION_DEFAULT, train_main, static_cast<void *>(this));
snprintfz(Tag, NETDATA_THREAD_TAG_MAX, "DETECT[%s]", rrdhost_hostname(RH));
- netdata_thread_create(&DetectionThread, Tag, NETDATA_THREAD_OPTION_JOINABLE, detect_main, static_cast<void *>(this));
+ netdata_thread_create(&DetectionThread, Tag, NETDATA_THREAD_OPTION_DEFAULT, detect_main, static_cast<void *>(this));
}
void Host::stopAnomalyDetectionThreads() {
@@ -311,8 +353,8 @@ void Host::stopAnomalyDetectionThreads() {
// Signal the training queue to stop popping-items
TrainingQueue.signal();
netdata_thread_cancel(TrainingThread);
- netdata_thread_join(TrainingThread, nullptr);
+ // netdata_thread_join(TrainingThread, nullptr);
netdata_thread_cancel(DetectionThread);
- netdata_thread_join(DetectionThread, nullptr);
+ // netdata_thread_join(DetectionThread, nullptr);
}
diff --git a/ml/Query.h b/ml/Query.h
index 78e99db350..bfc8e023be 100644
--- a/ml/Query.h
+++ b/ml/Query.h
@@ -12,15 +12,15 @@ public:
}
time_t latestTime() {
- return Ops->latest_time(RD->tiers[0]->db_metric_handle);
+ return Ops->latest_time_s(RD->tiers[0]->db_metric_handle);
}
time_t oldestTime() {
- return Ops->oldest_time(RD->tiers[0]->db_metric_handle);
+ return Ops->oldest_time_s(RD->tiers[0]->db_metric_handle);
}
void init(time_t AfterT, time_t BeforeT) {
- Ops->init(RD->tiers[0]->db_metric_handle, &Handle, AfterT, BeforeT);
+ Ops->init(RD->tiers[0]->db_metric_handle, &Handle, AfterT, BeforeT, STORAGE_PRIORITY_BEST_EFFORT);
Initialized = true;
points_read = 0;
}
@@ -40,7 +40,7 @@ public:
std::pair<time_t, CalculatedNumber> nextMetric() {
points_read++;
STORAGE_POINT sp = Ops->next_metric(&Handle);
- return { sp.end_time, sp.sum / sp.count };
+ return {sp.end_time_s, sp.sum / sp.count };
}
private:
diff --git a/ml/SamplesBuffer.cc b/ml/SamplesBuffer.cc
index 8fb9246e37..6278e27891 100644
--- a/ml/SamplesBuffer.cc
+++ b/ml/SamplesBuffer.cc
@@ -54,12 +54,12 @@ void SamplesBuffer::diffSamples() {
void SamplesBuffer::smoothSamples() {
// Holds the mean value of each window
- CalculatedNumber *AccCNs = new CalculatedNumber[NumDimsPerSample]();
- Sample Acc(AccCNs, NumDimsPerSample);
+ CalculatedNumber AccCNs[1] = { 0 };
+ Sample Acc(AccCNs, 1);
// Used to avoid clobbering the accumulator when moving the window
- CalculatedNumber *TmpCNs = new CalculatedNumber[NumDimsPerSample]();
- Sample Tmp(TmpCNs, NumDimsPerSample);
+ CalculatedNumber TmpCNs[1] = { 0 };
+ Sample Tmp(TmpCNs, 1);
CalculatedNumber Factor = (CalculatedNumber) 1 / SmoothN;
@@ -88,9 +88,6 @@ void SamplesBuffer::smoothSamples() {
Acc.copy(Tmp);
Acc.scale(Factor);
}
-
- delete[] AccCNs;
- delete[] TmpCNs;
}
void SamplesBuffer::lagSamples() {
diff --git a/ml/SamplesBuffer.h b/ml/SamplesBuffer.h
index 9edc710192..ca60f4b916 100644
--- a/ml/SamplesBuffer.h
+++ b/ml/SamplesBuffer.h
@@ -86,7 +86,9 @@ public:
DiffN(DiffN), SmoothN(SmoothN), LagN(LagN),
SamplingRatio(SamplingRatio), RandNums(RandNums),
BytesPerSample(NumDimsPerSample * sizeof(CalculatedNumber)),
- Preprocessed(false) {};
+ Preprocessed(false) {
+ assert(NumDimsPerSample == 1 && "SamplesBuffer supports only one dimension per sample");
+ };
void preprocess(std::vector<DSample> &Samples);
void preprocess(DSample &Feature);
diff --git a/ml/ml.cc b/ml/ml.cc
index f90b0f52fe..2a0df8867c 100644
--- a/ml/ml.cc
+++ b/ml/ml.cc
@@ -52,8 +52,6 @@ void ml_host_new(RRDHOST *RH) {
Host *H = new Host(RH);
RH->ml_host = reinterpret_cast<ml_host_t *>(H);
-
- H->startAnomalyDetectionThreads();
}
void ml_host_delete(RRDHOST *RH) {
@@ -61,8 +59,6 @@ void ml_host_delete(RRDHOST *RH) {
if (!H)
return;
- H->stopAnomalyDetectionThreads();
-
delete H;
RH->ml_host = nullptr;
}