diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2023-01-10 19:59:21 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-01-10 19:59:21 +0200 |
commit | 368a26cfee6887ca0cb2301d93138f63b75e353a (patch) | |
tree | b57e39fdb78dc57f7a2c1fcc3d9b6bf3c2a2a113 /ml | |
parent | b513888be389f92b2323d1bb3fdf55c22d4e4bad (diff) |
DBENGINE v2 (#14125)
* count open cache pages refering to datafile
* eliminate waste flush attempts
* remove eliminated variable
* journal v2 scanning split functions
* avoid locking open cache for a long time while migrating to journal v2
* dont acquire datafile for the loop; disable thread cancelability while a query is running
* work on datafile acquiring
* work on datafile deletion
* work on datafile deletion again
* logs of dbengine should start with DBENGINE
* thread specific key for queries to check if a query finishes without a finalize
* page_uuid is not used anymore
* Cleanup judy traversal when building new v2
Remove not needed calls to metric registry
* metric is 8 bytes smaller; timestamps are protected with a spinlock; timestamps in metric are now always coherent
* disable checks for invalid time-ranges
* Remove type from page details
* report scanning time
* remove infinite loop from datafile acquire for deletion
* remove infinite loop from datafile acquire for deletion again
* trace query handles
* properly allocate array of dimensions in replication
* metrics cleanup
* metrics registry uses arrayalloc
* arrayalloc free should be protected by lock
* use array alloc in page cache
* journal v2 scanning fix
* datafile reference leaking hunding
* do not load metrics of future timestamps
* initialize reasons
* fix datafile reference leak
* do not load pages that are entirely overlapped by others
* expand metric retention atomically
* split replication logic in initialization and execution
* replication prepare ahead queries
* replication prepare ahead queries fixed
* fix replication workers accounting
* add router active queries chart
* restore accounting of pages metadata sources; cleanup replication
* dont count skipped pages as unroutable
* notes on services shutdown
* do not migrate to journal v2 too early, while it has pending dirty pages in the main cache for the specific journal file
* do not add pages we dont need to pdc
* time in range re-work to provide info about past and future matches
* finner control on the pages selected for processing; accounting of page related issues
* fix invalid reference to handle->page
* eliminate data collection handle of pg_lookup_next
* accounting for queries with gaps
* query preprocessing the same way the processing is done; cache now supports all operations on Judy
* dynamic libuv workers based on number of processors; minimum libuv workers 8; replication query init ahead uses libuv workers - reserved ones (3)
* get into pdc all matching pages from main cache and open cache; do not do v2 scan if main cache and open cache can satisfy the query
* finner gaps calculation; accounting of overlapping pages in queries
* fix gaps accounting
* move datafile deletion to worker thread
* tune libuv workers and thread stack size
* stop netdata threads gradually
* run indexing together with cache flush/evict
* more work on clean shutdown
* limit the number of pages to evict per run
* do not lock the clean queue for accesses if it is not possible at that time - the page will be moved to the back of the list during eviction
* economies on flags for smaller page footprint; cleanup and renames
* eviction moves referenced pages to the end of the queue
* use murmur hash for indexing partition
* murmur should be static
* use more indexing partitions
* revert number of partitions to number of cpus
* cancel threads first, then stop services
* revert default thread stack size
* dont execute replication requests of disconnected senders
* wait more time for services that are exiting gradually
* fixed last commit
* finer control on page selection algorithm
* default stacksize of 1MB
* fix formatting
* fix worker utilization going crazy when the number is rotating
* avoid buffer full due to replication preprocessing of requests
* support query priorities
* add count of spins in spinlock when compiled with netdata internal checks
* remove prioritization from dbengine queries; cache now uses mutexes for the queues
* hot pages are now in sections judy arrays, like dirty
* align replication queries to optimal page size
* during flushing add to clean and evict in batches
* Revert "during flushing add to clean and evict in batches"
This reverts commit 8fb2b69d068499eacea6de8291c336e5e9f197c7.
* dont lock clean while evicting pages during flushing
* Revert "dont lock clean while evicting pages during flushing"
This reverts commit d6c82b5f40aeba86fc7aead062fab1b819ba58b3.
* Revert "Revert "during flushing add to clean and evict in batches""
This reverts commit ca7a187537fb8f743992700427e13042561211ec.
* dont cross locks during flushing, for the fastest flushes possible
* low-priority queries load pages synchronously
* Revert "low-priority queries load pages synchronously"
This reverts commit 1ef2662ddcd20fe5842b856c716df134c42d1dc7.
* cache uses spinlock again
* during flushing, dont lock the clean queue at all; each item is added atomically
* do smaller eviction runs
* evict one page at a time to minimize lock contention on the clean queue
* fix eviction statistics
* fix last commit
* plain should be main cache
* event loop cleanup; evictions and flushes can now happen concurrently
* run flush and evictions from tier0 only
* remove not needed variables
* flushing open cache is not needed; flushing protection is irrelevant since flushing is global for all tiers; added protection to datafiles so that only one flusher can run per datafile at any given time
* added worker jobs in timer to find the slow part of it
* support fast eviction of pages when all_of_them is set
* revert default thread stack size
* bypass event loop for dispatching read extent commands to workers - send them directly
* Revert "bypass event loop for dispatching read extent commands to workers - send them directly"
This reverts commit 2c08bc5bab12881ae33bc73ce5dea03dfc4e1fce.
* cache work requests
* minimize memory operations during flushing; caching of extent_io_descriptors and page_descriptors
* publish flushed pages to open cache in the thread pool
* prevent eventloop requests from getting stacked in the event loop
* single threaded dbengine controller; support priorities for all queries; major cleanup and restructuring of rrdengine.c
* more rrdengine.c cleanup
* enable db rotation
* do not log when there is a filter
* do not run multiple migration to journal v2
* load all extents async
* fix wrong paste
* report opcodes waiting, works dispatched, works executing
* cleanup event loop memory every 10 minutes
* dont dispatch more work requests than the number of threads available
* use the dispatched counter instead of the executing counter to check if the worker thread pool is full
* remove UV_RUN_NOWAIT
* replication to fill the queues
* caching of extent buffers; code cleanup
* caching of pdc and pd; rework on journal v2 indexing, datafile creation, database rotation
* single transaction wal
* synchronous flushing
* first cancel the threads, then signal them to exit
* caching of rrdeng query handles; added priority to query target; health is now low prio
* add priority to the missing points; do not allow critical priority in queries
* offload query preparation and routing to libuv thread pool
* updated timing charts for the offloaded query preparation
* caching of WALs
* accounting for struct caches (buffers); do not load extents with invalid sizes
* protection against memory booming during replication due to the optimal alignment of pages; sender thread buffer is now also reset when the circular buffer is reset
* also check if the expanded before is not the chart later updated time
* also check if the expanded before is not after the wall clock time of when the query started
* Remove unused variable
* replication to queue less queries; cleanup of internal fatals
* Mark dimension to be updated async
* caching of extent_page_details_list (epdl) and datafile_extent_offset_list (deol)
* disable pgc stress test, under an ifdef
* disable mrg stress test under an ifdef
* Mark chart and host labels, host info for async check and store in the database
* dictionary items use arrayalloc
* cache section pages structure is allocated with arrayalloc
* Add function to wakeup the aclk query threads and check for exit
Register function to be called during shutdown after signaling the service to exit
* parallel preparation of all dimensions of queries
* be more sensitive to enable streaming after replication
* atomically finish chart replication
* fix last commit
* fix last commit again
* fix last commit again again
* fix last commit again again again
* unify the normalization of retention calculation for collected charts; do not enable streaming if more than 60 points are to be transferred; eliminate an allocation during replication
* do not cancel start streaming; use high priority queries when we have locked chart data collection
* prevent starvation on opcodes execution, by allowing 2% of the requests to be re-ordered
* opcode now uses 2 spinlocks one for the caching of allocations and one for the waiting queue
* Remove check locks and NETDATA_VERIFY_LOCKS as it is not needed anymore
* Fix bad memory allocation / cleanup
* Cleanup ACLK sync initialization (part 1)
* Don't update metric registry during shutdown (part 1)
* Prevent crash when dashboard is refreshed and host goes away
* Mark ctx that is shutting down.
Test not adding flushed pages to open cache as hot if we are shutting down
* make ML work
* Fix compile without NETDATA_INTERNAL_CHECKS
* shutdown each ctx independently
* fix completion of quiesce
* do not update shared ML charts
* Create ML charts on child hosts.
When a parent runs a ML for a child, the relevant-ML charts
should be created on the child host. These charts should use
the parent's hostname to differentiate multiple parents that might
run ML for a child.
The only exception to this rule is the training/prediction resource
usage charts. These are created on the localhost of the parent host,
because they provide information specific to said host.
* check new ml code
* first save the database, then free all memory
* dbengine prep exit before freeing all memory; fixed deadlock in cache hot to dirty; added missing check to query engine about metrics without any data in the db
* Cleanup metadata thread (part 2)
* increase refcount before dispatching prep command
* Do not try to stop anomaly detection threads twice.
A separate function call has been added to stop anomaly detection threads.
This commit removes the left over function calls that were made
internally when a host was being created/destroyed.
* Remove allocations when smoothing samples buffer
The number of dims per sample is always 1, ie. we are training and
predicting only individual dimensions.
* set the orphan flag when loading archived hosts
* track worker dispatch callbacks and threadpool worker init
* make ML threads joinable; mark ctx having flushing in progress as early as possible
* fix allocation counter
* Cleanup metadata thread (part 3)
* Cleanup metadata thread (part 4)
* Skip metadata host scan when running unittest
* unittest support during init
* dont use all the libuv threads for queries
* break an infinite loop when sleep_usec() is interrupted
* ml prediction is a collector for several charts
* sleep_usec() now makes sure it will never loop if it passes the time expected; sleep_usec() now uses nanosleep() because clock_nanosleep() misses signals on netdata exit
* worker_unregister() in netdata threads cleanup
* moved pdc/epdl/deol/extent_buffer related code to pdc.c and pdc.h
* fixed ML issues
* removed engine2 directory
* added dbengine2 files in CMakeLists.txt
* move query plan data to query target, so that they can be exposed by in jsonwrap
* uniform definition of query plan according to the other query target members
* event_loop should be in daemon, not libnetdata
* metric_retention_by_uuid() is now part of the storage engine abstraction
* unify time_t variables to have the suffix _s (meaning: seconds)
* old dbengine statistics become "dbengine io"
* do not enable ML resource usage charts by default
* unify ml chart families, plugins and modules
* cleanup query plans from query target
* cleanup all extent buffers
* added debug info for rrddim slot to time
* rrddim now does proper gap management
* full rewrite of the mem modes
* use library functions for madvise
* use CHECKSUM_SZ for the checksum size
* fix coverity warning about the impossible case of returning a page that is entirely in the past of the query
* fix dbengine shutdown
* keep the old datafile lock until a new datafile has been created, to avoid creating multiple datafiles concurrently
* fine tune cache evictions
* dont initialize health if the health service is not running - prevent crash on shutdown while children get connected
* rename AS threads to ACLK[hostname]
* prevent re-use of uninitialized memory in queries
* use JulyL instead of JudyL for PDC operations - to test it first
* add also JulyL files
* fix July memory accounting
* disable July for PDC (use Judy)
* use the function to remove datafiles from linked list
* fix july and event_loop
* add july to libnetdata subdirs
* rename time_t variables that end in _t to end in _s
* replicate when there is a gap at the beginning of the replication period
* reset postponing of sender connections when a receiver is connected
* Adjust update every properly
* fix replication infinite loop due to last change
* packed enums in rrd.h and cleanup of obsolete rrd structure members
* prevent deadlock in replication: replication_recalculate_buffer_used_ratio_unsafe() deadlocking with replication_sender_delete_pending_requests()
* void unused variable
* void unused variables
* fix indentation
* entries_by_time calculation in VD was wrong; restored internal checks for checking future timestamps
* macros to caclulate page entries by time and size
* prevent statsd cleanup crash on exit
* cleanup health thread related variables
Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Co-authored-by: vkalintiris <vasilis@netdata.cloud>
Diffstat (limited to 'ml')
-rw-r--r-- | ml/ADCharts.cc | 123 | ||||
-rw-r--r-- | ml/Dimension.cc | 6 | ||||
-rw-r--r-- | ml/Dimension.h | 10 | ||||
-rw-r--r-- | ml/Host.cc | 62 | ||||
-rw-r--r-- | ml/Query.h | 8 | ||||
-rw-r--r-- | ml/SamplesBuffer.cc | 11 | ||||
-rw-r--r-- | ml/SamplesBuffer.h | 4 | ||||
-rw-r--r-- | ml/ml.cc | 4 |
8 files changed, 133 insertions, 95 deletions
diff --git a/ml/ADCharts.cc b/ml/ADCharts.cc index d77b36612c..cbb13f5d19 100644 --- a/ml/ADCharts.cc +++ b/ml/ADCharts.cc @@ -21,16 +21,16 @@ void ml::updateDimensionsChart(RRDHOST *RH, const MachineLearningStats &MLS) { NameSS << "machine_learning_status_on_" << rrdhost_hostname(localhost); MachineLearningStatusRS = rrdset_create( - RH, - "netdata", // type + RH, + "netdata", // type IdSS.str().c_str(), // id NameSS.str().c_str(), // name - "ml", // family + NETDATA_ML_CHART_FAMILY, // family "netdata.machine_learning_status", // ctx "Machine learning status", // title "dimensions", // units - "netdata", // plugin - "ml", // module + NETDATA_ML_PLUGIN, // plugin + NETDATA_ML_MODULE_TRAINING, // module NETDATA_ML_CHART_PRIO_MACHINE_LEARNING_STATUS, // priority RH->rrd_update_every, // update_every RRDSET_TYPE_LINE // chart_type @@ -65,16 +65,16 @@ void ml::updateDimensionsChart(RRDHOST *RH, const MachineLearningStats &MLS) { NameSS << "metric_types_on_" << rrdhost_hostname(localhost); MetricTypesRS = rrdset_create( - RH, - "netdata", // type + RH, + "netdata", // type IdSS.str().c_str(), // id NameSS.str().c_str(), // name - "ml", // family + NETDATA_ML_CHART_FAMILY, // family "netdata.metric_types", // ctx "Dimensions by metric type", // title "dimensions", // units - "netdata", // plugin - "ml", // module + NETDATA_ML_PLUGIN, // plugin + NETDATA_ML_MODULE_TRAINING, // module NETDATA_ML_CHART_PRIO_METRIC_TYPES, // priority RH->rrd_update_every, // update_every RRDSET_TYPE_LINE // chart_type @@ -109,16 +109,16 @@ void ml::updateDimensionsChart(RRDHOST *RH, const MachineLearningStats &MLS) { NameSS << "training_status_on_" << rrdhost_hostname(localhost); TrainingStatusRS = rrdset_create( - RH, - "netdata", // type + RH, + "netdata", // type IdSS.str().c_str(), // id NameSS.str().c_str(), // name - "ml", // family + NETDATA_ML_CHART_FAMILY, // family "netdata.training_status", // ctx "Training status of dimensions", // title "dimensions", // units - "netdata", // plugin - "ml", // module + NETDATA_ML_PLUGIN, // plugin + NETDATA_ML_MODULE_TRAINING, // module NETDATA_ML_CHART_PRIO_TRAINING_STATUS, // priority RH->rrd_update_every, // update_every RRDSET_TYPE_LINE // chart_type @@ -156,16 +156,16 @@ void ml::updateDimensionsChart(RRDHOST *RH, const MachineLearningStats &MLS) { NameSS << "dimensions_on_" << rrdhost_hostname(localhost); PredictionRS = rrdset_create( - RH, - "anomaly_detection", // type + RH, + "anomaly_detection", // type IdSS.str().c_str(), // id NameSS.str().c_str(), // name "dimensions", // family "anomaly_detection.dimensions", // ctx "Anomaly detection dimensions", // title "dimensions", // units - "netdata", // plugin - "ml", // module + NETDATA_ML_PLUGIN, // plugin + NETDATA_ML_MODULE_TRAINING, // module ML_CHART_PRIO_DIMENSIONS, // priority RH->rrd_update_every, // update_every RRDSET_TYPE_LINE // chart_type @@ -195,16 +195,16 @@ void ml::updateHostAndDetectionRateCharts(RRDHOST *RH, collected_number AnomalyR NameSS << "anomaly_rate_on_" << rrdhost_hostname(localhost); HostRateRS = rrdset_create( - RH, - "anomaly_detection", // type + RH, + "anomaly_detection", // type IdSS.str().c_str(), // id NameSS.str().c_str(), // name "anomaly_rate", // family "anomaly_detection.anomaly_rate", // ctx "Percentage of anomalous dimensions", // title "percentage", // units - "netdata", // plugin - "ml", // module + NETDATA_ML_PLUGIN, // plugin + NETDATA_ML_MODULE_DETECTION, // module ML_CHART_PRIO_ANOMALY_RATE, // priority RH->rrd_update_every, // update_every RRDSET_TYPE_LINE // chart_type @@ -229,16 +229,16 @@ void ml::updateHostAndDetectionRateCharts(RRDHOST *RH, collected_number AnomalyR NameSS << "anomaly_detection_on_" << rrdhost_hostname(localhost); AnomalyDetectionRS = rrdset_create( - RH, - "anomaly_detection", // type + RH, + "anomaly_detection", // type IdSS.str().c_str(), // id NameSS.str().c_str(), // name "anomaly_detection", // family "anomaly_detection.detector_events", // ctx "Anomaly detection events", // title "percentage", // units - "netdata", // plugin - "ml", // module + NETDATA_ML_PLUGIN, // plugin + NETDATA_ML_MODULE_DETECTION, // module ML_CHART_PRIO_DETECTOR_EVENTS, // priority RH->rrd_update_every, // update_every RRDSET_TYPE_LINE // chart_type @@ -271,20 +271,21 @@ void ml::updateHostAndDetectionRateCharts(RRDHOST *RH, collected_number AnomalyR NULL /* group options */, 0, /* timeout */ 0, /* tier */ - QUERY_SOURCE_ML + QUERY_SOURCE_ML, + STORAGE_PRIORITY_BEST_EFFORT ); if(R) { - assert(R->d == 1 && R->n == 1 && R->rows == 1); - - static thread_local bool PrevAboveThreshold = false; - bool AboveThreshold = R->v[0] >= Cfg.HostAnomalyRateThreshold; - bool NewAnomalyEvent = AboveThreshold && !PrevAboveThreshold; - PrevAboveThreshold = AboveThreshold; - - rrddim_set_by_pointer(AnomalyDetectionRS, AboveThresholdRD, AboveThreshold); - rrddim_set_by_pointer(AnomalyDetectionRS, NewAnomalyEventRD, NewAnomalyEvent); - rrdset_done(AnomalyDetectionRS); + if(R->d == 1 && R->n == 1 && R->rows == 1) { + static thread_local bool PrevAboveThreshold = false; + bool AboveThreshold = R->v[0] >= Cfg.HostAnomalyRateThreshold; + bool NewAnomalyEvent = AboveThreshold && !PrevAboveThreshold; + PrevAboveThreshold = AboveThreshold; + + rrddim_set_by_pointer(AnomalyDetectionRS, AboveThresholdRD, AboveThreshold); + rrddim_set_by_pointer(AnomalyDetectionRS, NewAnomalyEventRD, NewAnomalyEvent); + rrdset_done(AnomalyDetectionRS); + } rrdr_free(OWA, R); } @@ -309,15 +310,15 @@ void ml::updateResourceUsageCharts(RRDHOST *RH, const struct rusage &PredictionR NameSS << "prediction_usage_for_" << rrdhost_hostname(RH); RS = rrdset_create_localhost( - "netdata", // type + "netdata", // type IdSS.str().c_str(), // id NameSS.str().c_str(), // name - "ml", // family + NETDATA_ML_CHART_FAMILY, // family "netdata.prediction_usage", // ctx "Prediction resource usage", // title "milliseconds/s", // units - "netdata", // plugin - "ml", // module + NETDATA_ML_PLUGIN, // plugin + NETDATA_ML_MODULE_PREDICTION, // module NETDATA_ML_CHART_PRIO_PREDICTION_USAGE, // priority RH->rrd_update_every, // update_every RRDSET_TYPE_STACKED // chart_type @@ -350,15 +351,15 @@ void ml::updateResourceUsageCharts(RRDHOST *RH, const struct rusage &PredictionR NameSS << "training_usage_for_" << rrdhost_hostname(RH); RS = rrdset_create_localhost( - "netdata", // type + "netdata", // type IdSS.str().c_str(), // id NameSS.str().c_str(), // name - "ml", // family + NETDATA_ML_CHART_FAMILY, // family "netdata.training_usage", // ctx "Training resource usage", // title "milliseconds/s", // units - "netdata", // plugin - "ml", // module + NETDATA_ML_PLUGIN, // plugin + NETDATA_ML_MODULE_TRAINING, // module NETDATA_ML_CHART_PRIO_TRAINING_USAGE, // priority RH->rrd_update_every, // update_every RRDSET_TYPE_STACKED // chart_type @@ -393,16 +394,16 @@ void ml::updateTrainingStatisticsChart(RRDHOST *RH, const TrainingStats &TS) { NameSS << "queue_stats_on_" << rrdhost_hostname(localhost); RS = rrdset_create( - RH, - "netdata", // type + RH, + "netdata", // type IdSS.str().c_str(), // id NameSS.str().c_str(), // name - "ml", // family + NETDATA_ML_CHART_FAMILY, // family "netdata.queue_stats", // ctx "Training queue stats", // title "items", // units - "netdata", // plugin - "ml", // module + NETDATA_ML_PLUGIN, // plugin + NETDATA_ML_MODULE_TRAINING, // module NETDATA_ML_CHART_PRIO_QUEUE_STATS, // priority RH->rrd_update_every, // update_every RRDSET_TYPE_LINE// chart_type @@ -436,16 +437,16 @@ void ml::updateTrainingStatisticsChart(RRDHOST *RH, const TrainingStats &TS) { NameSS << "training_time_stats_on_" << rrdhost_hostname(localhost); RS = rrdset_create( - RH, - "netdata", // type + RH, + "netdata", // type IdSS.str().c_str(), // id NameSS.str().c_str(), // name - "ml", // family + NETDATA_ML_CHART_FAMILY, // family "netdata.training_time_stats", // ctx "Training time stats", // title "milliseconds", // units - "netdata", // plugin - "ml", // module + NETDATA_ML_PLUGIN, // plugin + NETDATA_ML_MODULE_TRAINING, // module NETDATA_ML_CHART_PRIO_TRAINING_TIME_STATS, // priority RH->rrd_update_every, // update_every RRDSET_TYPE_LINE// chart_type @@ -483,16 +484,16 @@ void ml::updateTrainingStatisticsChart(RRDHOST *RH, const TrainingStats &TS) { NameSS << "training_results_on_" << rrdhost_hostname(localhost); RS = rrdset_create( - RH, - "netdata", // type + RH, + "netdata", // type IdSS.str().c_str(), // id NameSS.str().c_str(), // name - "ml", // family + NETDATA_ML_CHART_FAMILY, // family "netdata.training_results", // ctx "Training results", // title "events", // units - "netdata", // plugin - "ml", // module + NETDATA_ML_PLUGIN, // plugin + NETDATA_ML_MODULE_TRAINING, // module NETDATA_ML_CHART_PRIO_TRAINING_RESULTS, // priority RH->rrd_update_every, // update_every RRDSET_TYPE_LINE// chart_type diff --git a/ml/Dimension.cc b/ml/Dimension.cc index ef324a0736..62699aaad7 100644 --- a/ml/Dimension.cc +++ b/ml/Dimension.cc @@ -70,8 +70,8 @@ std::pair<CalculatedNumber *, TrainingResponse> Dimension::getCalculatedNumbers( TrainingResp.FirstEntryOnRequest = TrainingReq.FirstEntryOnRequest; TrainingResp.LastEntryOnRequest = TrainingReq.LastEntryOnRequest; - TrainingResp.FirstEntryOnResponse = rrddim_first_entry_t_of_tier(RD, 0); - TrainingResp.LastEntryOnResponse = rrddim_last_entry_t_of_tier(RD, 0); + TrainingResp.FirstEntryOnResponse = rrddim_first_entry_s_of_tier(RD, 0); + TrainingResp.LastEntryOnResponse = rrddim_last_entry_s_of_tier(RD, 0); size_t MinN = Cfg.MinTrainSamples; size_t MaxN = Cfg.MaxTrainSamples; @@ -195,7 +195,7 @@ TrainingResult Dimension::trainModel(const TrainingRequest &TrainingReq) { MT = MetricType::Constant; TS = TrainingStatus::Trained; TR = TrainingResp; - LastTrainingTime = rrddim_last_entry_t(RD); + LastTrainingTime = rrddim_last_entry_s(RD); } delete[] CNs; diff --git a/ml/Dimension.h b/ml/Dimension.h index 4d06c9372d..2b1adfff9e 100644 --- a/ml/Dimension.h +++ b/ml/Dimension.h @@ -166,11 +166,11 @@ public: private: TrainingRequest getTrainingRequest(time_t CurrT) const { return TrainingRequest { - string_dup(RD->rrdset->id), - string_dup(RD->id), - CurrT, - rrddim_first_entry_t(RD), - rrddim_last_entry_t(RD) + string_dup(RD->rrdset->id), + string_dup(RD->id), + CurrT, + rrddim_first_entry_s(RD), + rrddim_last_entry_s(RD) }; } diff --git a/ml/Host.cc b/ml/Host.cc index fcf97b2b8d..5980f610d8 100644 --- a/ml/Host.cc +++ b/ml/Host.cc @@ -54,7 +54,15 @@ void Host::getModelsAsJson(nlohmann::json &Json) { } } +#define WORKER_JOB_DETECTION_PREP 0 +#define WORKER_JOB_DETECTION_DIM_CHART 1 +#define WORKER_JOB_DETECTION_HOST_CHART 2 +#define WORKER_JOB_DETECTION_STATS 3 +#define WORKER_JOB_DETECTION_RESOURCES 4 + void Host::detectOnce() { + worker_is_busy(WORKER_JOB_DETECTION_PREP); + MLS = {}; MachineLearningStats MLSCopy = {}; TrainingStats TSCopy = {}; @@ -134,13 +142,20 @@ void Host::detectOnce() { TSCopy.RemainingUT = 0; } + worker_is_busy(WORKER_JOB_DETECTION_DIM_CHART); updateDimensionsChart(RH, MLSCopy); + + worker_is_busy(WORKER_JOB_DETECTION_HOST_CHART); updateHostAndDetectionRateCharts(RH, HostAnomalyRate * 10000.0); +#ifdef NETDATA_ML_RESOURCE_CHARTS + worker_is_busy(WORKER_JOB_DETECTION_RESOURCES); struct rusage PredictionRU; getrusage(RUSAGE_THREAD, &PredictionRU); updateResourceUsageCharts(RH, PredictionRU, TSCopy.TrainingRU); +#endif + worker_is_busy(WORKER_JOB_DETECTION_STATS); updateTrainingStatisticsChart(RH, TSCopy); } @@ -150,7 +165,6 @@ public: RRDDIM_ACQUIRED *AcqRD = nullptr; Dimension *D = nullptr; - rrdhost_rdlock(RH); RRDSET *RS = rrdset_find(RH, string2str(ChartId)); if (RS) { AcqRD = rrddim_find_and_acquire(RS, string2str(DimensionId)); @@ -160,7 +174,6 @@ public: D = reinterpret_cast<Dimension *>(RD->ml_dimension); } } - rrdhost_unlock(RH); return AcquiredDimension(AcqRD, D); } @@ -190,8 +203,19 @@ void Host::scheduleForTraining(TrainingRequest TR) { TrainingQueue.push(TR); } +#define WORKER_JOB_TRAINING_FIND 0 +#define WORKER_JOB_TRAINING_TRAIN 1 +#define WORKER_JOB_TRAINING_STATS 2 + void Host::train() { - while (!netdata_exit) { + worker_register("MLTRAIN"); + worker_register_job_name(WORKER_JOB_TRAINING_FIND, "find"); + worker_register_job_name(WORKER_JOB_TRAINING_TRAIN, "train"); + worker_register_job_name(WORKER_JOB_TRAINING_STATS, "stats"); + + service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, (force_quit_t )ml_stop_anomaly_detection_threads, RH, true); + + while (service_running(SERVICE_ML_TRAINING)) { auto P = TrainingQueue.pop(); TrainingRequest TrainingReq = P.first; size_t Size = P.second; @@ -200,15 +224,21 @@ void Host::train() { if (AllottedUT > USEC_PER_SEC) AllottedUT = USEC_PER_SEC; - usec_t StartUT = now_realtime_usec(); + usec_t StartUT = now_monotonic_usec(); TrainingResult TrainingRes; { + worker_is_busy(WORKER_JOB_TRAINING_FIND); AcquiredDimension AcqDim = AcquiredDimension::find(RH, TrainingReq.ChartId, TrainingReq.DimensionId); + + worker_is_busy(WORKER_JOB_TRAINING_TRAIN); TrainingRes = AcqDim.train(TrainingReq); + string_freez(TrainingReq.ChartId); string_freez(TrainingReq.DimensionId); } - usec_t ConsumedUT = now_realtime_usec() - StartUT; + usec_t ConsumedUT = now_monotonic_usec() - StartUT; + + worker_is_busy(WORKER_JOB_TRAINING_STATS); usec_t RemainingUT = 0; if (ConsumedUT < AllottedUT) @@ -249,15 +279,27 @@ void Host::train() { } } + worker_is_idle(); std::this_thread::sleep_for(std::chrono::microseconds{RemainingUT}); + worker_is_busy(0); } } void Host::detect() { + worker_register("MLDETECT"); + worker_register_job_name(WORKER_JOB_DETECTION_PREP, "prep"); + worker_register_job_name(WORKER_JOB_DETECTION_DIM_CHART, "dim chart"); + worker_register_job_name(WORKER_JOB_DETECTION_HOST_CHART, "host chart"); + worker_register_job_name(WORKER_JOB_DETECTION_STATS, "stats"); + worker_register_job_name(WORKER_JOB_DETECTION_RESOURCES, "resources"); + + service_register(SERVICE_THREAD_TYPE_NETDATA, NULL, (force_quit_t )ml_stop_anomaly_detection_threads, RH, true); + heartbeat_t HB; heartbeat_init(&HB); - while (!netdata_exit) { + while (service_running((SERVICE_TYPE)(SERVICE_ML_PREDICTION | SERVICE_COLLECTORS))) { + worker_is_idle(); heartbeat_next(&HB, RH->rrd_update_every * USEC_PER_SEC); detectOnce(); } @@ -294,10 +336,10 @@ void Host::startAnomalyDetectionThreads() { char Tag[NETDATA_THREAD_TAG_MAX + 1]; snprintfz(Tag, NETDATA_THREAD_TAG_MAX, "TRAIN[%s]", rrdhost_hostname(RH)); - netdata_thread_create(&TrainingThread, Tag, NETDATA_THREAD_OPTION_JOINABLE, train_main, static_cast<void *>(this)); + netdata_thread_create(&TrainingThread, Tag, NETDATA_THREAD_OPTION_DEFAULT, train_main, static_cast<void *>(this)); snprintfz(Tag, NETDATA_THREAD_TAG_MAX, "DETECT[%s]", rrdhost_hostname(RH)); - netdata_thread_create(&DetectionThread, Tag, NETDATA_THREAD_OPTION_JOINABLE, detect_main, static_cast<void *>(this)); + netdata_thread_create(&DetectionThread, Tag, NETDATA_THREAD_OPTION_DEFAULT, detect_main, static_cast<void *>(this)); } void Host::stopAnomalyDetectionThreads() { @@ -311,8 +353,8 @@ void Host::stopAnomalyDetectionThreads() { // Signal the training queue to stop popping-items TrainingQueue.signal(); netdata_thread_cancel(TrainingThread); - netdata_thread_join(TrainingThread, nullptr); + // netdata_thread_join(TrainingThread, nullptr); netdata_thread_cancel(DetectionThread); - netdata_thread_join(DetectionThread, nullptr); + // netdata_thread_join(DetectionThread, nullptr); } diff --git a/ml/Query.h b/ml/Query.h index 78e99db350..bfc8e023be 100644 --- a/ml/Query.h +++ b/ml/Query.h @@ -12,15 +12,15 @@ public: } time_t latestTime() { - return Ops->latest_time(RD->tiers[0]->db_metric_handle); + return Ops->latest_time_s(RD->tiers[0]->db_metric_handle); } time_t oldestTime() { - return Ops->oldest_time(RD->tiers[0]->db_metric_handle); + return Ops->oldest_time_s(RD->tiers[0]->db_metric_handle); } void init(time_t AfterT, time_t BeforeT) { - Ops->init(RD->tiers[0]->db_metric_handle, &Handle, AfterT, BeforeT); + Ops->init(RD->tiers[0]->db_metric_handle, &Handle, AfterT, BeforeT, STORAGE_PRIORITY_BEST_EFFORT); Initialized = true; points_read = 0; } @@ -40,7 +40,7 @@ public: std::pair<time_t, CalculatedNumber> nextMetric() { points_read++; STORAGE_POINT sp = Ops->next_metric(&Handle); - return { sp.end_time, sp.sum / sp.count }; + return {sp.end_time_s, sp.sum / sp.count }; } private: diff --git a/ml/SamplesBuffer.cc b/ml/SamplesBuffer.cc index 8fb9246e37..6278e27891 100644 --- a/ml/SamplesBuffer.cc +++ b/ml/SamplesBuffer.cc @@ -54,12 +54,12 @@ void SamplesBuffer::diffSamples() { void SamplesBuffer::smoothSamples() { // Holds the mean value of each window - CalculatedNumber *AccCNs = new CalculatedNumber[NumDimsPerSample](); - Sample Acc(AccCNs, NumDimsPerSample); + CalculatedNumber AccCNs[1] = { 0 }; + Sample Acc(AccCNs, 1); // Used to avoid clobbering the accumulator when moving the window - CalculatedNumber *TmpCNs = new CalculatedNumber[NumDimsPerSample](); - Sample Tmp(TmpCNs, NumDimsPerSample); + CalculatedNumber TmpCNs[1] = { 0 }; + Sample Tmp(TmpCNs, 1); CalculatedNumber Factor = (CalculatedNumber) 1 / SmoothN; @@ -88,9 +88,6 @@ void SamplesBuffer::smoothSamples() { Acc.copy(Tmp); Acc.scale(Factor); } - - delete[] AccCNs; - delete[] TmpCNs; } void SamplesBuffer::lagSamples() { diff --git a/ml/SamplesBuffer.h b/ml/SamplesBuffer.h index 9edc710192..ca60f4b916 100644 --- a/ml/SamplesBuffer.h +++ b/ml/SamplesBuffer.h @@ -86,7 +86,9 @@ public: DiffN(DiffN), SmoothN(SmoothN), LagN(LagN), SamplingRatio(SamplingRatio), RandNums(RandNums), BytesPerSample(NumDimsPerSample * sizeof(CalculatedNumber)), - Preprocessed(false) {}; + Preprocessed(false) { + assert(NumDimsPerSample == 1 && "SamplesBuffer supports only one dimension per sample"); + }; void preprocess(std::vector<DSample> &Samples); void preprocess(DSample &Feature); @@ -52,8 +52,6 @@ void ml_host_new(RRDHOST *RH) { Host *H = new Host(RH); RH->ml_host = reinterpret_cast<ml_host_t *>(H); - - H->startAnomalyDetectionThreads(); } void ml_host_delete(RRDHOST *RH) { @@ -61,8 +59,6 @@ void ml_host_delete(RRDHOST *RH) { if (!H) return; - H->stopAnomalyDetectionThreads(); - delete H; RH->ml_host = nullptr; } |