diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2023-01-10 19:59:21 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-01-10 19:59:21 +0200 |
commit | 368a26cfee6887ca0cb2301d93138f63b75e353a (patch) | |
tree | b57e39fdb78dc57f7a2c1fcc3d9b6bf3c2a2a113 /collectors | |
parent | b513888be389f92b2323d1bb3fdf55c22d4e4bad (diff) |
DBENGINE v2 (#14125)
* count open cache pages refering to datafile
* eliminate waste flush attempts
* remove eliminated variable
* journal v2 scanning split functions
* avoid locking open cache for a long time while migrating to journal v2
* dont acquire datafile for the loop; disable thread cancelability while a query is running
* work on datafile acquiring
* work on datafile deletion
* work on datafile deletion again
* logs of dbengine should start with DBENGINE
* thread specific key for queries to check if a query finishes without a finalize
* page_uuid is not used anymore
* Cleanup judy traversal when building new v2
Remove not needed calls to metric registry
* metric is 8 bytes smaller; timestamps are protected with a spinlock; timestamps in metric are now always coherent
* disable checks for invalid time-ranges
* Remove type from page details
* report scanning time
* remove infinite loop from datafile acquire for deletion
* remove infinite loop from datafile acquire for deletion again
* trace query handles
* properly allocate array of dimensions in replication
* metrics cleanup
* metrics registry uses arrayalloc
* arrayalloc free should be protected by lock
* use array alloc in page cache
* journal v2 scanning fix
* datafile reference leaking hunding
* do not load metrics of future timestamps
* initialize reasons
* fix datafile reference leak
* do not load pages that are entirely overlapped by others
* expand metric retention atomically
* split replication logic in initialization and execution
* replication prepare ahead queries
* replication prepare ahead queries fixed
* fix replication workers accounting
* add router active queries chart
* restore accounting of pages metadata sources; cleanup replication
* dont count skipped pages as unroutable
* notes on services shutdown
* do not migrate to journal v2 too early, while it has pending dirty pages in the main cache for the specific journal file
* do not add pages we dont need to pdc
* time in range re-work to provide info about past and future matches
* finner control on the pages selected for processing; accounting of page related issues
* fix invalid reference to handle->page
* eliminate data collection handle of pg_lookup_next
* accounting for queries with gaps
* query preprocessing the same way the processing is done; cache now supports all operations on Judy
* dynamic libuv workers based on number of processors; minimum libuv workers 8; replication query init ahead uses libuv workers - reserved ones (3)
* get into pdc all matching pages from main cache and open cache; do not do v2 scan if main cache and open cache can satisfy the query
* finner gaps calculation; accounting of overlapping pages in queries
* fix gaps accounting
* move datafile deletion to worker thread
* tune libuv workers and thread stack size
* stop netdata threads gradually
* run indexing together with cache flush/evict
* more work on clean shutdown
* limit the number of pages to evict per run
* do not lock the clean queue for accesses if it is not possible at that time - the page will be moved to the back of the list during eviction
* economies on flags for smaller page footprint; cleanup and renames
* eviction moves referenced pages to the end of the queue
* use murmur hash for indexing partition
* murmur should be static
* use more indexing partitions
* revert number of partitions to number of cpus
* cancel threads first, then stop services
* revert default thread stack size
* dont execute replication requests of disconnected senders
* wait more time for services that are exiting gradually
* fixed last commit
* finer control on page selection algorithm
* default stacksize of 1MB
* fix formatting
* fix worker utilization going crazy when the number is rotating
* avoid buffer full due to replication preprocessing of requests
* support query priorities
* add count of spins in spinlock when compiled with netdata internal checks
* remove prioritization from dbengine queries; cache now uses mutexes for the queues
* hot pages are now in sections judy arrays, like dirty
* align replication queries to optimal page size
* during flushing add to clean and evict in batches
* Revert "during flushing add to clean and evict in batches"
This reverts commit 8fb2b69d068499eacea6de8291c336e5e9f197c7.
* dont lock clean while evicting pages during flushing
* Revert "dont lock clean while evicting pages during flushing"
This reverts commit d6c82b5f40aeba86fc7aead062fab1b819ba58b3.
* Revert "Revert "during flushing add to clean and evict in batches""
This reverts commit ca7a187537fb8f743992700427e13042561211ec.
* dont cross locks during flushing, for the fastest flushes possible
* low-priority queries load pages synchronously
* Revert "low-priority queries load pages synchronously"
This reverts commit 1ef2662ddcd20fe5842b856c716df134c42d1dc7.
* cache uses spinlock again
* during flushing, dont lock the clean queue at all; each item is added atomically
* do smaller eviction runs
* evict one page at a time to minimize lock contention on the clean queue
* fix eviction statistics
* fix last commit
* plain should be main cache
* event loop cleanup; evictions and flushes can now happen concurrently
* run flush and evictions from tier0 only
* remove not needed variables
* flushing open cache is not needed; flushing protection is irrelevant since flushing is global for all tiers; added protection to datafiles so that only one flusher can run per datafile at any given time
* added worker jobs in timer to find the slow part of it
* support fast eviction of pages when all_of_them is set
* revert default thread stack size
* bypass event loop for dispatching read extent commands to workers - send them directly
* Revert "bypass event loop for dispatching read extent commands to workers - send them directly"
This reverts commit 2c08bc5bab12881ae33bc73ce5dea03dfc4e1fce.
* cache work requests
* minimize memory operations during flushing; caching of extent_io_descriptors and page_descriptors
* publish flushed pages to open cache in the thread pool
* prevent eventloop requests from getting stacked in the event loop
* single threaded dbengine controller; support priorities for all queries; major cleanup and restructuring of rrdengine.c
* more rrdengine.c cleanup
* enable db rotation
* do not log when there is a filter
* do not run multiple migration to journal v2
* load all extents async
* fix wrong paste
* report opcodes waiting, works dispatched, works executing
* cleanup event loop memory every 10 minutes
* dont dispatch more work requests than the number of threads available
* use the dispatched counter instead of the executing counter to check if the worker thread pool is full
* remove UV_RUN_NOWAIT
* replication to fill the queues
* caching of extent buffers; code cleanup
* caching of pdc and pd; rework on journal v2 indexing, datafile creation, database rotation
* single transaction wal
* synchronous flushing
* first cancel the threads, then signal them to exit
* caching of rrdeng query handles; added priority to query target; health is now low prio
* add priority to the missing points; do not allow critical priority in queries
* offload query preparation and routing to libuv thread pool
* updated timing charts for the offloaded query preparation
* caching of WALs
* accounting for struct caches (buffers); do not load extents with invalid sizes
* protection against memory booming during replication due to the optimal alignment of pages; sender thread buffer is now also reset when the circular buffer is reset
* also check if the expanded before is not the chart later updated time
* also check if the expanded before is not after the wall clock time of when the query started
* Remove unused variable
* replication to queue less queries; cleanup of internal fatals
* Mark dimension to be updated async
* caching of extent_page_details_list (epdl) and datafile_extent_offset_list (deol)
* disable pgc stress test, under an ifdef
* disable mrg stress test under an ifdef
* Mark chart and host labels, host info for async check and store in the database
* dictionary items use arrayalloc
* cache section pages structure is allocated with arrayalloc
* Add function to wakeup the aclk query threads and check for exit
Register function to be called during shutdown after signaling the service to exit
* parallel preparation of all dimensions of queries
* be more sensitive to enable streaming after replication
* atomically finish chart replication
* fix last commit
* fix last commit again
* fix last commit again again
* fix last commit again again again
* unify the normalization of retention calculation for collected charts; do not enable streaming if more than 60 points are to be transferred; eliminate an allocation during replication
* do not cancel start streaming; use high priority queries when we have locked chart data collection
* prevent starvation on opcodes execution, by allowing 2% of the requests to be re-ordered
* opcode now uses 2 spinlocks one for the caching of allocations and one for the waiting queue
* Remove check locks and NETDATA_VERIFY_LOCKS as it is not needed anymore
* Fix bad memory allocation / cleanup
* Cleanup ACLK sync initialization (part 1)
* Don't update metric registry during shutdown (part 1)
* Prevent crash when dashboard is refreshed and host goes away
* Mark ctx that is shutting down.
Test not adding flushed pages to open cache as hot if we are shutting down
* make ML work
* Fix compile without NETDATA_INTERNAL_CHECKS
* shutdown each ctx independently
* fix completion of quiesce
* do not update shared ML charts
* Create ML charts on child hosts.
When a parent runs a ML for a child, the relevant-ML charts
should be created on the child host. These charts should use
the parent's hostname to differentiate multiple parents that might
run ML for a child.
The only exception to this rule is the training/prediction resource
usage charts. These are created on the localhost of the parent host,
because they provide information specific to said host.
* check new ml code
* first save the database, then free all memory
* dbengine prep exit before freeing all memory; fixed deadlock in cache hot to dirty; added missing check to query engine about metrics without any data in the db
* Cleanup metadata thread (part 2)
* increase refcount before dispatching prep command
* Do not try to stop anomaly detection threads twice.
A separate function call has been added to stop anomaly detection threads.
This commit removes the left over function calls that were made
internally when a host was being created/destroyed.
* Remove allocations when smoothing samples buffer
The number of dims per sample is always 1, ie. we are training and
predicting only individual dimensions.
* set the orphan flag when loading archived hosts
* track worker dispatch callbacks and threadpool worker init
* make ML threads joinable; mark ctx having flushing in progress as early as possible
* fix allocation counter
* Cleanup metadata thread (part 3)
* Cleanup metadata thread (part 4)
* Skip metadata host scan when running unittest
* unittest support during init
* dont use all the libuv threads for queries
* break an infinite loop when sleep_usec() is interrupted
* ml prediction is a collector for several charts
* sleep_usec() now makes sure it will never loop if it passes the time expected; sleep_usec() now uses nanosleep() because clock_nanosleep() misses signals on netdata exit
* worker_unregister() in netdata threads cleanup
* moved pdc/epdl/deol/extent_buffer related code to pdc.c and pdc.h
* fixed ML issues
* removed engine2 directory
* added dbengine2 files in CMakeLists.txt
* move query plan data to query target, so that they can be exposed by in jsonwrap
* uniform definition of query plan according to the other query target members
* event_loop should be in daemon, not libnetdata
* metric_retention_by_uuid() is now part of the storage engine abstraction
* unify time_t variables to have the suffix _s (meaning: seconds)
* old dbengine statistics become "dbengine io"
* do not enable ML resource usage charts by default
* unify ml chart families, plugins and modules
* cleanup query plans from query target
* cleanup all extent buffers
* added debug info for rrddim slot to time
* rrddim now does proper gap management
* full rewrite of the mem modes
* use library functions for madvise
* use CHECKSUM_SZ for the checksum size
* fix coverity warning about the impossible case of returning a page that is entirely in the past of the query
* fix dbengine shutdown
* keep the old datafile lock until a new datafile has been created, to avoid creating multiple datafiles concurrently
* fine tune cache evictions
* dont initialize health if the health service is not running - prevent crash on shutdown while children get connected
* rename AS threads to ACLK[hostname]
* prevent re-use of uninitialized memory in queries
* use JulyL instead of JudyL for PDC operations - to test it first
* add also JulyL files
* fix July memory accounting
* disable July for PDC (use Judy)
* use the function to remove datafiles from linked list
* fix july and event_loop
* add july to libnetdata subdirs
* rename time_t variables that end in _t to end in _s
* replicate when there is a gap at the beginning of the replication period
* reset postponing of sender connections when a receiver is connected
* Adjust update every properly
* fix replication infinite loop due to last change
* packed enums in rrd.h and cleanup of obsolete rrd structure members
* prevent deadlock in replication: replication_recalculate_buffer_used_ratio_unsafe() deadlocking with replication_sender_delete_pending_requests()
* void unused variable
* void unused variables
* fix indentation
* entries_by_time calculation in VD was wrong; restored internal checks for checking future timestamps
* macros to caclulate page entries by time and size
* prevent statsd cleanup crash on exit
* cleanup health thread related variables
Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Co-authored-by: vkalintiris <vasilis@netdata.cloud>
Diffstat (limited to 'collectors')
-rw-r--r-- | collectors/all.h | 7 | ||||
-rw-r--r-- | collectors/cgroups.plugin/sys_fs_cgroup.c | 27 | ||||
-rw-r--r-- | collectors/diskspace.plugin/plugin_diskspace.c | 16 | ||||
-rw-r--r-- | collectors/idlejitter.plugin/plugin_idlejitter.c | 2 | ||||
-rw-r--r-- | collectors/plugins.d/plugins_d.c | 8 | ||||
-rw-r--r-- | collectors/plugins.d/pluginsd_parser.c | 26 | ||||
-rw-r--r-- | collectors/proc.plugin/plugin_proc.c | 6 | ||||
-rw-r--r-- | collectors/proc.plugin/proc_net_dev.c | 4 | ||||
-rw-r--r-- | collectors/statsd.plugin/statsd.c | 29 | ||||
-rw-r--r-- | collectors/tc.plugin/plugin_tc.c | 6 | ||||
-rw-r--r-- | collectors/timex.plugin/plugin_timex.c | 2 |
11 files changed, 83 insertions, 50 deletions
diff --git a/collectors/all.h b/collectors/all.h index 8774eb702a..958ed7c214 100644 --- a/collectors/all.h +++ b/collectors/all.h @@ -382,4 +382,11 @@ #define NETDATA_ML_CHART_PRIO_TRAINING_TIME_STATS 890007 #define NETDATA_ML_CHART_PRIO_TRAINING_RESULTS 890008 +#define NETDATA_ML_CHART_FAMILY "ml - machine learning" +#define NETDATA_ML_PLUGIN "ml.plugin" +#define NETDATA_ML_MODULE_TRAINING "training" +#define NETDATA_ML_MODULE_DETECTION "detection" +#define NETDATA_ML_MODULE_PREDICTION "prediction" + + #endif //NETDATA_ALL_H diff --git a/collectors/cgroups.plugin/sys_fs_cgroup.c b/collectors/cgroups.plugin/sys_fs_cgroup.c index 8f7548286c..f9dc085b46 100644 --- a/collectors/cgroups.plugin/sys_fs_cgroup.c +++ b/collectors/cgroups.plugin/sys_fs_cgroup.c @@ -2754,10 +2754,20 @@ static inline void discovery_find_all_cgroups() { debug(D_CGROUP, "done searching for cgroups"); } +static void cgroup_discovery_cleanup(void *ptr) { + UNUSED(ptr); + + discovery_thread.exited = 1; + worker_unregister(); + service_exits(); +} + void cgroup_discovery_worker(void *ptr) { UNUSED(ptr); + netdata_thread_cleanup_push(cgroup_discovery_cleanup, ptr); + worker_register("CGROUPSDISC"); worker_register_job_name(WORKER_DISCOVERY_INIT, "init"); worker_register_job_name(WORKER_DISCOVERY_FIND, "find"); @@ -2777,24 +2787,23 @@ void cgroup_discovery_worker(void *ptr) NULL, SIMPLE_PATTERN_EXACT); - while (!netdata_exit) { + while (service_running(SERVICE_COLLECTORS)) { worker_is_idle(); uv_mutex_lock(&discovery_thread.mutex); - while (!discovery_thread.start_discovery) + while (!discovery_thread.start_discovery && service_running(SERVICE_COLLECTORS)) uv_cond_wait(&discovery_thread.cond_var, &discovery_thread.mutex); discovery_thread.start_discovery = 0; uv_mutex_unlock(&discovery_thread.mutex); - if (unlikely(netdata_exit)) + if (unlikely(!service_running(SERVICE_COLLECTORS))) break; discovery_find_all_cgroups(); } - discovery_thread.exited = 1; - worker_unregister(); -} + netdata_thread_cleanup_pop(1); +} // ---------------------------------------------------------------------------- // generate charts @@ -4853,11 +4862,11 @@ void *cgroups_main(void *ptr) { usec_t step = cgroup_update_every * USEC_PER_SEC; usec_t find_every = cgroup_check_for_new_every * USEC_PER_SEC, find_dt = 0; - while(!netdata_exit) { + while(service_running(SERVICE_COLLECTORS)) { worker_is_idle(); usec_t hb_dt = heartbeat_next(&hb, step); - if(unlikely(netdata_exit)) break; + if(unlikely(!service_running(SERVICE_COLLECTORS))) break; find_dt += hb_dt; if (unlikely(find_dt >= find_every || (!is_inside_k8s && cgroups_check))) { @@ -4872,9 +4881,11 @@ void *cgroups_main(void *ptr) { worker_is_busy(WORKER_CGROUPS_READ); read_all_discovered_cgroups(cgroup_root); + if(unlikely(!service_running(SERVICE_COLLECTORS))) break; worker_is_busy(WORKER_CGROUPS_CHART); update_cgroup_charts(cgroup_update_every); + if(unlikely(!service_running(SERVICE_COLLECTORS))) break; worker_is_idle(); uv_mutex_unlock(&cgroup_root_mutex); diff --git a/collectors/diskspace.plugin/plugin_diskspace.c b/collectors/diskspace.plugin/plugin_diskspace.c index e806a33602..81604e2048 100644 --- a/collectors/diskspace.plugin/plugin_diskspace.c +++ b/collectors/diskspace.plugin/plugin_diskspace.c @@ -515,7 +515,7 @@ void *diskspace_slow_worker(void *ptr) heartbeat_t hb; heartbeat_init(&hb); - while(!netdata_exit) { + while(service_running(SERVICE_COLLECTORS)) { worker_is_idle(); heartbeat_next(&hb, USEC_PER_SEC); @@ -530,7 +530,7 @@ void *diskspace_slow_worker(void *ptr) if (!dict_mountpoints) continue; - if(unlikely(netdata_exit)) break; + if(unlikely(!service_running(SERVICE_COLLECTORS))) break; // -------------------------------------------------------------------------- // disk space metrics @@ -547,10 +547,10 @@ void *diskspace_slow_worker(void *ptr) for(bmi = slow_mountinfo_root; bmi; bmi = bmi->next) { do_slow_disk_space_stats(bmi, slow_update_every); - if(unlikely(netdata_exit)) break; + if(unlikely(!service_running(SERVICE_COLLECTORS))) break; } - if(unlikely(netdata_exit)) break; + if(unlikely(!service_running(SERVICE_COLLECTORS))) break; worker_is_busy(WORKER_JOB_SLOW_CLEANUP); @@ -640,11 +640,11 @@ void *diskspace_main(void *ptr) { usec_t step = update_every * USEC_PER_SEC; heartbeat_t hb; heartbeat_init(&hb); - while(!netdata_exit) { + while(service_running(SERVICE_COLLECTORS)) { worker_is_idle(); /* usec_t hb_dt = */ heartbeat_next(&hb, step); - if(unlikely(netdata_exit)) break; + if(unlikely(!service_running(SERVICE_COLLECTORS))) break; // -------------------------------------------------------------------------- // this is smart enough not to reload it every time @@ -671,11 +671,11 @@ void *diskspace_main(void *ptr) { worker_is_busy(WORKER_JOB_MOUNTPOINT); do_disk_space_stats(mi, update_every); - if(unlikely(netdata_exit)) break; + if(unlikely(!service_running(SERVICE_COLLECTORS))) break; } netdata_mutex_unlock(&slow_mountinfo_mutex); - if(unlikely(netdata_exit)) break; + if(unlikely(!service_running(SERVICE_COLLECTORS))) break; if(dict_mountpoints) { worker_is_busy(WORKER_JOB_CLEANUP); diff --git a/collectors/idlejitter.plugin/plugin_idlejitter.c b/collectors/idlejitter.plugin/plugin_idlejitter.c index b6339cc0fc..df0f9b9515 100644 --- a/collectors/idlejitter.plugin/plugin_idlejitter.c +++ b/collectors/idlejitter.plugin/plugin_idlejitter.c @@ -48,7 +48,7 @@ void *cpuidlejitter_main(void *ptr) { usec_t update_every_ut = localhost->rrd_update_every * USEC_PER_SEC; struct timeval before, after; - while (!netdata_exit) { + while (service_running(SERVICE_COLLECTORS)) { int iterations = 0; usec_t error_total = 0, error_min = 0, diff --git a/collectors/plugins.d/plugins_d.c b/collectors/plugins.d/plugins_d.c index 79abc70708..34130efff3 100644 --- a/collectors/plugins.d/plugins_d.c +++ b/collectors/plugins.d/plugins_d.c @@ -120,7 +120,7 @@ void *pluginsd_worker_thread(void *arg) cd->obsolete = 0; size_t count = 0; - while (!netdata_exit) { + while (service_running(SERVICE_COLLECTORS)) { FILE *fp_child_input = NULL; FILE *fp_child_output = netdata_popen(cd->cmd, &cd->pid, &fp_child_input); if (unlikely(!fp_child_input || !fp_child_output)) { @@ -186,12 +186,12 @@ void *pluginsd_main(void *ptr) // so that we don't log broken directories on each loop int directory_errors[PLUGINSD_MAX_DIRECTORIES] = { 0 }; - while (!netdata_exit) { + while (service_running(SERVICE_COLLECTORS)) { int idx; const char *directory_name; for (idx = 0; idx < PLUGINSD_MAX_DIRECTORIES && (directory_name = plugin_directories[idx]); idx++) { - if (unlikely(netdata_exit)) + if (unlikely(!service_running(SERVICE_COLLECTORS))) break; errno = 0; @@ -206,7 +206,7 @@ void *pluginsd_main(void *ptr) struct dirent *file = NULL; while (likely((file = readdir(dir)))) { - if (unlikely(netdata_exit)) + if (unlikely(!service_running(SERVICE_COLLECTORS))) break; debug(D_PLUGINSD, "examining file '%s'", file->d_name); diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c index 5501c12fad..264d0eca41 100644 --- a/collectors/plugins.d/pluginsd_parser.c +++ b/collectors/plugins.d/pluginsd_parser.c @@ -441,19 +441,20 @@ PARSER_RC pluginsd_dimension(char **words, size_t num_words, void *user) } else rrddim_isnot_obsolete(st, rd); + bool should_update_dimension = false; + if (likely(unhide_dimension)) { rrddim_option_clear(rd, RRDDIM_OPTION_HIDDEN); - if (rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN)) { - rrddim_flag_clear(rd, RRDDIM_FLAG_META_HIDDEN); - metaqueue_dimension_update_flags(rd); - } + should_update_dimension = rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN); } else { rrddim_option_set(rd, RRDDIM_OPTION_HIDDEN); - if (!rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN)) { - rrddim_flag_set(rd, RRDDIM_FLAG_META_HIDDEN); - metaqueue_dimension_update_flags(rd); - } + should_update_dimension = !rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN); + } + + if (should_update_dimension) { + rrddim_flag_set(rd, RRDDIM_FLAG_METADATA_UPDATE); + rrdhost_flag_set(rd->rrdset->rrdhost, RRDHOST_FLAG_METADATA_UPDATE); } return PARSER_RC_OK; @@ -883,7 +884,7 @@ PARSER_RC pluginsd_overwrite(char **words __maybe_unused, size_t num_words __may host->rrdlabels = rrdlabels_create(); rrdlabels_migrate_to_these(host->rrdlabels, (DICTIONARY *) (((PARSER_USER_OBJECT *)user)->new_host_labels)); - metaqueue_store_host_labels(host->machine_guid); + rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_LABELS | RRDHOST_FLAG_METADATA_UPDATE); rrdlabels_destroy(((PARSER_USER_OBJECT *)user)->new_host_labels); ((PARSER_USER_OBJECT *)user)->new_host_labels = NULL; @@ -991,7 +992,7 @@ PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *use if(start_time && end_time && start_time < wall_clock_time + tolerance && end_time < wall_clock_time + tolerance && start_time < end_time) { if (unlikely(end_time - start_time != st->update_every)) - rrdset_set_update_every(st, end_time - start_time); + rrdset_set_update_every_s(st, end_time - start_time); st->last_collected_time.tv_sec = end_time; st->last_collected_time.tv_usec = 0; @@ -1251,6 +1252,7 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user) st->counter++; st->counter_done++; + store_metric_collection_completed(); #ifdef NETDATA_LOG_REPLICATION_REQUESTS st->replay.start_streaming = false; @@ -1262,7 +1264,7 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user) if (start_streaming) { if (st->update_every != update_every_child) - rrdset_set_update_every(st, update_every_child); + rrdset_set_update_every_s(st, update_every_child); if(rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) { rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); @@ -1339,7 +1341,7 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi user.parser = parser; while (likely(!parser_next(parser))) { - if (unlikely(netdata_exit || parser_action(parser, NULL))) + if (unlikely(!service_running(SERVICE_COLLECTORS) || parser_action(parser, NULL))) break; } diff --git a/collectors/proc.plugin/plugin_proc.c b/collectors/proc.plugin/plugin_proc.c index 1b24df45fd..220f9cb3db 100644 --- a/collectors/proc.plugin/plugin_proc.c +++ b/collectors/proc.plugin/plugin_proc.c @@ -128,15 +128,15 @@ void *proc_main(void *ptr) heartbeat_t hb; heartbeat_init(&hb); - while (!netdata_exit) { + while (service_running(SERVICE_COLLECTORS)) { worker_is_idle(); usec_t hb_dt = heartbeat_next(&hb, step); - if (unlikely(netdata_exit)) + if (unlikely(!service_running(SERVICE_COLLECTORS))) break; for (i = 0; proc_modules[i].name; i++) { - if (unlikely(netdata_exit)) + if (unlikely(!service_running(SERVICE_COLLECTORS))) break; struct proc_module *pm = &proc_modules[i]; diff --git a/collectors/proc.plugin/proc_net_dev.c b/collectors/proc.plugin/proc_net_dev.c index 4f8a86cb9f..82f3ec1946 100644 --- a/collectors/proc.plugin/proc_net_dev.c +++ b/collectors/proc.plugin/proc_net_dev.c @@ -1504,11 +1504,11 @@ void *netdev_main(void *ptr) heartbeat_t hb; heartbeat_init(&hb); - while (!netdata_exit) { + while (service_running(SERVICE_COLLECTORS)) { worker_is_idle(); usec_t hb_dt = heartbeat_next(&hb, step); - if (unlikely(netdata_exit)) + if (unlikely(!service_running(SERVICE_COLLECTORS))) break; worker_is_busy(0); diff --git a/collectors/statsd.plugin/statsd.c b/collectors/statsd.plugin/statsd.c index 67d7ed2e26..b8a62fb9b1 100644 --- a/collectors/statsd.plugin/statsd.c +++ b/collectors/statsd.plugin/statsd.c @@ -234,7 +234,8 @@ typedef struct statsd_app { // global statsd data struct collection_thread_status { - int status; + SPINLOCK spinlock; + bool running; size_t max_sockets; netdata_thread_t thread; @@ -875,7 +876,7 @@ struct statsd_tcp { #ifdef HAVE_RECVMMSG struct statsd_udp { - int *running; + struct collection_thread_status *status; STATSD_SOCKET_DATA_TYPE type; size_t size; struct iovec *iovecs; @@ -1097,7 +1098,9 @@ static int statsd_snd_callback(POLLINFO *pi, short int *events) { void statsd_collector_thread_cleanup(void *data) { struct statsd_udp *d = data; - *d->running = 0; + netdata_spinlock_lock(&d->status->spinlock); + d->status->running = false; + netdata_spinlock_unlock(&d->status->spinlock); info("cleaning up..."); @@ -1114,9 +1117,15 @@ void statsd_collector_thread_cleanup(void *data) { worker_unregister(); } +static bool statsd_should_stop(void) { + return !service_running(SERVICE_COLLECTORS); +} + void *statsd_collector_thread(void *ptr) { struct collection_thread_status *status = ptr; - status->status = 1; + netdata_spinlock_lock(&status->spinlock); + status->running = true; + netdata_spinlock_unlock(&status->spinlock); worker_register("STATSD"); worker_register_job_name(WORKER_JOB_TYPE_TCP_CONNECTED, "tcp connect"); @@ -1127,7 +1136,7 @@ void *statsd_collector_thread(void *ptr) { info("STATSD collector thread started with taskid %d", gettid()); struct statsd_udp *d = callocz(sizeof(struct statsd_udp), 1); - d->running = &status->status; + d->status = status; netdata_thread_cleanup_push(statsd_collector_thread_cleanup, d); @@ -1152,6 +1161,7 @@ void *statsd_collector_thread(void *ptr) { , statsd_rcv_callback , statsd_snd_callback , NULL + , statsd_should_stop , NULL // No access control pattern , 0 // No dns lookups for access control pattern , (void *)d @@ -2358,13 +2368,15 @@ static void statsd_main_cleanup(void *data) { if (statsd.collection_threads_status) { int i; for (i = 0; i < statsd.threads; i++) { - if(statsd.collection_threads_status[i].status) { + netdata_spinlock_lock(&statsd.collection_threads_status[i].spinlock); + if(statsd.collection_threads_status[i].running) { info("STATSD: stopping data collection thread %d...", i + 1); netdata_thread_cancel(statsd.collection_threads_status[i].thread); } else { info("STATSD: data collection thread %d found stopped.", i + 1); } + netdata_spinlock_unlock(&statsd.collection_threads_status[i].spinlock); } } @@ -2537,6 +2549,7 @@ void *statsd_main(void *ptr) { statsd.collection_threads_status[i].max_sockets = max_sockets / statsd.threads; char tag[NETDATA_THREAD_TAG_MAX + 1]; snprintfz(tag, NETDATA_THREAD_TAG_MAX, "STATSD_COLLECTOR[%d]", i + 1); + netdata_spinlock_init(&statsd.collection_threads_status[i].spinlock); netdata_thread_create(&statsd.collection_threads_status[i].thread, tag, NETDATA_THREAD_OPTION_DEFAULT, statsd_collector_thread, &statsd.collection_threads_status[i]); } @@ -2753,7 +2766,7 @@ void *statsd_main(void *ptr) { usec_t step = statsd.update_every * USEC_PER_SEC; heartbeat_t hb; heartbeat_init(&hb); - while(!netdata_exit) { + while(service_running(SERVICE_COLLECTORS)) { worker_is_idle(); heartbeat_next(&hb, step); @@ -2781,7 +2794,7 @@ void *statsd_main(void *ptr) { worker_is_busy(WORKER_STATSD_FLUSH_STATS); statsd_update_all_app_charts(); - if(unlikely(netdata_exit)) + if(unlikely(!service_running(SERVICE_COLLECTORS))) break; if(global_statistics_enabled) { diff --git a/collectors/tc.plugin/plugin_tc.c b/collectors/tc.plugin/plugin_tc.c index a2e72ee339..331a787f34 100644 --- a/collectors/tc.plugin/plugin_tc.c +++ b/collectors/tc.plugin/plugin_tc.c @@ -929,7 +929,7 @@ void *tc_main(void *ptr) { snprintfz(command, TC_LINE_MAX, "%s/tc-qos-helper.sh", netdata_configured_primary_plugins_dir); char *tc_script = config_get("plugin:tc", "script to run to get tc values", command); - while(!netdata_exit) { + while(service_running(SERVICE_COLLECTORS)) { FILE *fp_child_input, *fp_child_output; struct tc_device *device = NULL; struct tc_class *class = NULL; @@ -945,7 +945,7 @@ void *tc_main(void *ptr) { char buffer[TC_LINE_MAX+1] = ""; while(fgets(buffer, TC_LINE_MAX, fp_child_output) != NULL) { - if(unlikely(netdata_exit)) break; + if(unlikely(!service_running(SERVICE_COLLECTORS))) break; buffer[TC_LINE_MAX] = '\0'; // debug(D_TC_LOOP, "TC: read '%s'", buffer); @@ -1162,7 +1162,7 @@ void *tc_main(void *ptr) { class = NULL; } - if(unlikely(netdata_exit)) + if(unlikely(!service_running(SERVICE_COLLECTORS))) goto cleanup; if(code == 1 || code == 127) { diff --git a/collectors/timex.plugin/plugin_timex.c b/collectors/timex.plugin/plugin_timex.c index 46cfc57967..84147c8513 100644 --- a/collectors/timex.plugin/plugin_timex.c +++ b/collectors/timex.plugin/plugin_timex.c @@ -64,7 +64,7 @@ void *timex_main(void *ptr) usec_t step = update_every * USEC_PER_SEC; heartbeat_t hb; heartbeat_init(&hb); - while (!netdata_exit) { + while (service_running(SERVICE_COLLECTORS)) { worker_is_idle(); heartbeat_next(&hb, step); worker_is_busy(0); |