diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2023-01-23 22:18:44 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-01-23 22:18:44 +0200 |
commit | dd0f7ae992a8de282c77dc7745c5090e5d65cc28 (patch) | |
tree | fecf5514eda33c0a96f4d359f30fd07229d12cf7 /collectors | |
parent | c2c3876c519fbc22a60a5d8b753dc6d8e81e0fed (diff) |
DBENGINE v2 - improvements part 7 (#14307)
* run cleanup in workers
* when there is a discrepancy between update every, fix it
* fix the other occurences of metric update every mismatch
* allow resetting the same timestamp
* validate flushed pages before committing them to disk
* initialize collection with the latest time in mrg
* these should be static functions
* acquire metrics for writing to detect multiple data collections of the same metric
* print the uuid of the metric that is collected twice
* log the discrepancies of completed pages
* 1 second tolerance
* unify validation of pages and related logging across dbengine
* make do_flush_pages() thread safe
* flush pages runs on libuv workers
* added uv events to tp workers
* dont cross datafile spinlock and rwlock
* should be unlock
* prevent the creation of multiple datafiles
* break an infinite replication loop
* do not log the epxansion of the replication window due to start streaming
* log all invalid pages with internal checks
* do not shutdown event loop threads
* add information about collected page events, to find the root cause of invalid collected pages
* rewrite of the gap filling to fix the invalid collected pages problem
* handle multiple collections of the same metric gracefully
* added log about main cache page conflicts; fix gap filling once again...
* keep track of the first metric writer
* it should be an internal fatal - it does not harm users
* do not check of future timestamps on collected pages, since we inherit the clock of the children; do not check collected pages validity without internal checks
* prevent negative replication completion percentage
* internal error for the discrepancy of mrg
* better logging of dbengine new metrics collection
* without internal checks it is unused
* prevent pluginsd crash on exit due to calling pthread_cancel() on an exited thread
* renames and atomics everywhere
* if a datafile cannot be acquired for deletion during shutdown, continue - this can happen when there are hot pages in open cache referencing it
* Debug for context load
* rrdcontext uuid debug
* rrddim uuid debug
* rrdeng uuid debug
* Revert "rrdeng uuid debug"
This reverts commit 393da190826a582e7e6cc90771bf91b175826d8b.
* Revert "rrddim uuid debug"
This reverts commit 72150b30408294f141b19afcfb35abd7c34777d8.
* Revert "rrdcontext uuid debug"
This reverts commit 2c3b940dc23f460226e9b2a6861c214e840044d0.
* Revert "Debug for context load"
This reverts commit 0d880fc1589f128524e0b47abd9ff0714283ce3b.
* do not use legacy uuids on multihost dbs
* thread safety for journafile size
* handle other cases of inconsistent collected pages
* make health thread check if it should be running in key loops
* do not log uuids
Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'collectors')
-rw-r--r-- | collectors/plugins.d/plugins_d.c | 123 | ||||
-rw-r--r-- | collectors/plugins.d/plugins_d.h | 13 | ||||
-rw-r--r-- | collectors/plugins.d/pluginsd_parser.c | 11 |
3 files changed, 92 insertions, 55 deletions
diff --git a/collectors/plugins.d/plugins_d.c b/collectors/plugins.d/plugins_d.c index e585d237ab..7608f3afcb 100644 --- a/collectors/plugins.d/plugins_d.c +++ b/collectors/plugins.d/plugins_d.c @@ -21,23 +21,54 @@ inline size_t pluginsd_initialize_plugin_directories() return quoted_strings_splitter(plugins_dir_list, plugin_directories, PLUGINSD_MAX_DIRECTORIES, config_isspace, NULL, NULL, 0); } +static inline void plugin_set_disabled(struct plugind *cd) { + netdata_spinlock_lock(&cd->unsafe.spinlock); + cd->unsafe.enabled = false; + netdata_spinlock_unlock(&cd->unsafe.spinlock); +} + +bool plugin_is_enabled(struct plugind *cd) { + netdata_spinlock_lock(&cd->unsafe.spinlock); + bool ret = cd->unsafe.enabled; + netdata_spinlock_unlock(&cd->unsafe.spinlock); + return ret; +} + +static inline void plugin_set_running(struct plugind *cd) { + netdata_spinlock_lock(&cd->unsafe.spinlock); + cd->unsafe.running = true; + netdata_spinlock_unlock(&cd->unsafe.spinlock); +} + +static inline bool plugin_is_running(struct plugind *cd) { + netdata_spinlock_lock(&cd->unsafe.spinlock); + bool ret = cd->unsafe.running; + netdata_spinlock_unlock(&cd->unsafe.spinlock); + return ret; +} + static void pluginsd_worker_thread_cleanup(void *arg) { struct plugind *cd = (struct plugind *)arg; - if (cd->enabled && !cd->obsolete) { - cd->obsolete = 1; + netdata_spinlock_lock(&cd->unsafe.spinlock); + + cd->unsafe.running = false; + cd->unsafe.thread = 0; + pid_t pid = cd->unsafe.pid; + cd->unsafe.pid = 0; + + netdata_spinlock_unlock(&cd->unsafe.spinlock); + + if (pid) { info("data collection thread exiting"); - if (cd->pid) { - siginfo_t info; - info("killing child process pid %d", cd->pid); - if (killpid(cd->pid) != -1) { - info("waiting for child process pid %d to exit...", cd->pid); - waitid(P_PID, (id_t)cd->pid, &info, WEXITED); - } - cd->pid = 0; + siginfo_t info; + info("killing child process pid %d", pid); + if (killpid(pid) != -1) { + info("waiting for child process pid %d to exit...", pid); + waitid(P_PID, (id_t)pid, &info, WEXITED); } } } @@ -53,8 +84,8 @@ static void pluginsd_worker_thread_handle_success(struct plugind *cd) if (likely(cd->serial_failures <= SERIAL_FAILURES_THRESHOLD)) { info( "'%s' (pid %d) does not generate useful output but it reports success (exits with 0). %s.", - cd->fullfilename, cd->pid, - cd->enabled ? "Waiting a bit before starting it again." : "Will not start it again - it is now disabled."); + cd->fullfilename, cd->unsafe.pid, + plugin_is_enabled(cd) ? "Waiting a bit before starting it again." : "Will not start it again - it is now disabled."); sleep((unsigned int)(cd->update_every * 10)); return; } @@ -63,35 +94,33 @@ static void pluginsd_worker_thread_handle_success(struct plugind *cd) error( "'%s' (pid %d) does not generate useful output, although it reports success (exits with 0)." "We have tried to collect something %zu times - unsuccessfully. Disabling it.", - cd->fullfilename, cd->pid, cd->serial_failures); - cd->enabled = 0; + cd->fullfilename, cd->unsafe.pid, cd->serial_failures); + plugin_set_disabled(cd); return; } - - return; } static void pluginsd_worker_thread_handle_error(struct plugind *cd, int worker_ret_code) { if (worker_ret_code == -1) { - info("'%s' (pid %d) was killed with SIGTERM. Disabling it.", cd->fullfilename, cd->pid); - cd->enabled = 0; + info("'%s' (pid %d) was killed with SIGTERM. Disabling it.", cd->fullfilename, cd->unsafe.pid); + plugin_set_disabled(cd); return; } if (!cd->successful_collections) { error( "'%s' (pid %d) exited with error code %d and haven't collected any data. Disabling it.", cd->fullfilename, - cd->pid, worker_ret_code); - cd->enabled = 0; + cd->unsafe.pid, worker_ret_code); + plugin_set_disabled(cd); return; } if (cd->serial_failures <= SERIAL_FAILURES_THRESHOLD) { error( "'%s' (pid %d) exited with error code %d, but has given useful output in the past (%zu times). %s", - cd->fullfilename, cd->pid, worker_ret_code, cd->successful_collections, - cd->enabled ? "Waiting a bit before starting it again." : "Will not start it again - it is disabled."); + cd->fullfilename, cd->unsafe.pid, worker_ret_code, cd->successful_collections, + plugin_is_enabled(cd) ? "Waiting a bit before starting it again." : "Will not start it again - it is disabled."); sleep((unsigned int)(cd->update_every * 10)); return; } @@ -100,48 +129,47 @@ static void pluginsd_worker_thread_handle_error(struct plugind *cd, int worker_r error( "'%s' (pid %d) exited with error code %d, but has given useful output in the past (%zu times)." "We tried to restart it %zu times, but it failed to generate data. Disabling it.", - cd->fullfilename, cd->pid, worker_ret_code, cd->successful_collections, cd->serial_failures); - cd->enabled = 0; + cd->fullfilename, cd->unsafe.pid, worker_ret_code, cd->successful_collections, cd->serial_failures); + plugin_set_disabled(cd); return; } - - return; } + #undef SERIAL_FAILURES_THRESHOLD -void *pluginsd_worker_thread(void *arg) +static void *pluginsd_worker_thread(void *arg) { worker_register("PLUGINSD"); netdata_thread_cleanup_push(pluginsd_worker_thread_cleanup, arg); struct plugind *cd = (struct plugind *)arg; + plugin_set_running(cd); - cd->obsolete = 0; size_t count = 0; while (service_running(SERVICE_COLLECTORS)) { FILE *fp_child_input = NULL; - FILE *fp_child_output = netdata_popen(cd->cmd, &cd->pid, &fp_child_input); + FILE *fp_child_output = netdata_popen(cd->cmd, &cd->unsafe.pid, &fp_child_input); if (unlikely(!fp_child_input || !fp_child_output)) { error("Cannot popen(\"%s\", \"r\").", cd->cmd); break; } - info("connected to '%s' running on pid %d", cd->fullfilename, cd->pid); + info("connected to '%s' running on pid %d", cd->fullfilename, cd->unsafe.pid); count = pluginsd_process(localhost, cd, fp_child_input, fp_child_output, 0); - error("'%s' (pid %d) disconnected after %zu successful data collections (ENDs).", cd->fullfilename, cd->pid, count); - killpid(cd->pid); + error("'%s' (pid %d) disconnected after %zu successful data collections (ENDs).", cd->fullfilename, cd->unsafe.pid, count); + killpid(cd->unsafe.pid); - int worker_ret_code = netdata_pclose(fp_child_input, fp_child_output, cd->pid); + int worker_ret_code = netdata_pclose(fp_child_input, fp_child_output, cd->unsafe.pid); if (likely(worker_ret_code == 0)) pluginsd_worker_thread_handle_success(cd); else pluginsd_worker_thread_handle_error(cd, worker_ret_code); - cd->pid = 0; - if (unlikely(!cd->enabled)) + cd->unsafe.pid = 0; + if (unlikely(!plugin_is_enabled(cd))) break; } worker_unregister(); @@ -158,10 +186,12 @@ static void pluginsd_main_cleanup(void *data) struct plugind *cd; for (cd = pluginsd_root; cd; cd = cd->next) { - if (cd->enabled && !cd->obsolete) { + netdata_spinlock_lock(&cd->unsafe.spinlock); + if (cd->unsafe.enabled && cd->unsafe.running && cd->unsafe.thread != 0) { info("stopping plugin thread: %s", cd->id); - netdata_thread_cancel(cd->thread); + netdata_thread_cancel(cd->unsafe.thread); } + netdata_spinlock_unlock(&cd->unsafe.spinlock); } info("cleanup completed."); @@ -237,7 +267,7 @@ void *pluginsd_main(void *ptr) if (unlikely(strcmp(cd->filename, file->d_name) == 0)) break; - if (likely(cd && !cd->obsolete)) { + if (likely(cd && plugin_is_running(cd))) { debug(D_PLUGINSD, "plugin '%s' is already running", cd->filename); continue; } @@ -252,7 +282,9 @@ void *pluginsd_main(void *ptr) strncpyz(cd->filename, file->d_name, FILENAME_MAX); snprintfz(cd->fullfilename, FILENAME_MAX, "%s/%s", directory_name, cd->filename); - cd->enabled = enabled; + cd->unsafe.enabled = enabled; + cd->unsafe.running = false; + cd->update_every = (int)config_get_number(cd->id, "update every", localhost->rrd_update_every); cd->started_t = now_realtime_sec(); @@ -266,15 +298,16 @@ void *pluginsd_main(void *ptr) cd->next = pluginsd_root; pluginsd_root = cd; - // it is not currently running - cd->obsolete = 1; - - if (cd->enabled) { + if (plugin_is_enabled(cd)) { char tag[NETDATA_THREAD_TAG_MAX + 1]; snprintfz(tag, NETDATA_THREAD_TAG_MAX, "PD[%s]", pluginname); + // spawn a new thread for it - netdata_thread_create( - &cd->thread, tag, NETDATA_THREAD_OPTION_DEFAULT, pluginsd_worker_thread, cd); + netdata_thread_create(&cd->unsafe.thread, + tag, + NETDATA_THREAD_OPTION_DEFAULT, + pluginsd_worker_thread, + cd); } } } diff --git a/collectors/plugins.d/plugins_d.h b/collectors/plugins.d/plugins_d.h index a8acf038a4..35af9fe583 100644 --- a/collectors/plugins.d/plugins_d.h +++ b/collectors/plugins.d/plugins_d.h @@ -50,9 +50,6 @@ struct plugind { char fullfilename[FILENAME_MAX+1]; // with path char cmd[PLUGINSD_CMD_MAX+1]; // the command that it executes - volatile pid_t pid; - netdata_thread_t thread; - size_t successful_collections; // the number of times we have seen // values collected from this plugin @@ -60,8 +57,14 @@ struct plugind { // without collecting values int update_every; // the plugin default data collection frequency - volatile sig_atomic_t obsolete; // do not touch this structure after setting this to 1 - volatile sig_atomic_t enabled; // if this is enabled or not + + struct { + SPINLOCK spinlock; + bool running; // do not touch this structure after setting this to 1 + bool enabled; // if this is enabled or not + netdata_thread_t thread; + pid_t pid; + } unsafe; time_t started_t; uint32_t capabilities; // follows the same principles as streaming capabilities diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c index 50bdc0b301..00c07e9db1 100644 --- a/collectors/plugins.d/pluginsd_parser.c +++ b/collectors/plugins.d/pluginsd_parser.c @@ -1239,7 +1239,8 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user) time_t started = st->rrdhost->receiver->replication_first_time_t; time_t current = ((PARSER_USER_OBJECT *) user)->replay.end_time; - worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, + if(started && current > started) + worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, (NETDATA_DOUBLE)(current - started) * 100.0 / (NETDATA_DOUBLE)(now - started)); } @@ -1300,10 +1301,10 @@ static void pluginsd_process_thread_cleanup(void *ptr) { inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugin_input, FILE *fp_plugin_output, int trust_durations) { - int enabled = cd->enabled; + int enabled = cd->unsafe.enabled; if (!fp_plugin_input || !fp_plugin_output || !enabled) { - cd->enabled = 0; + cd->unsafe.enabled = 0; return 0; } @@ -1323,7 +1324,7 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi clearerr(fp_plugin_output); PARSER_USER_OBJECT user = { - .enabled = cd->enabled, + .enabled = cd->unsafe.enabled, .host = host, .cd = cd, .trust_durations = trust_durations @@ -1348,7 +1349,7 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi // free parser with the pop function netdata_thread_cleanup_pop(1); - cd->enabled = user.enabled; + cd->unsafe.enabled = user.enabled; size_t count = user.count; if (likely(count)) { |