summaryrefslogtreecommitdiffstats
path: root/collectors
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-01-23 22:18:44 +0200
committerGitHub <noreply@github.com>2023-01-23 22:18:44 +0200
commitdd0f7ae992a8de282c77dc7745c5090e5d65cc28 (patch)
treefecf5514eda33c0a96f4d359f30fd07229d12cf7 /collectors
parentc2c3876c519fbc22a60a5d8b753dc6d8e81e0fed (diff)
DBENGINE v2 - improvements part 7 (#14307)
* run cleanup in workers * when there is a discrepancy between update every, fix it * fix the other occurences of metric update every mismatch * allow resetting the same timestamp * validate flushed pages before committing them to disk * initialize collection with the latest time in mrg * these should be static functions * acquire metrics for writing to detect multiple data collections of the same metric * print the uuid of the metric that is collected twice * log the discrepancies of completed pages * 1 second tolerance * unify validation of pages and related logging across dbengine * make do_flush_pages() thread safe * flush pages runs on libuv workers * added uv events to tp workers * dont cross datafile spinlock and rwlock * should be unlock * prevent the creation of multiple datafiles * break an infinite replication loop * do not log the epxansion of the replication window due to start streaming * log all invalid pages with internal checks * do not shutdown event loop threads * add information about collected page events, to find the root cause of invalid collected pages * rewrite of the gap filling to fix the invalid collected pages problem * handle multiple collections of the same metric gracefully * added log about main cache page conflicts; fix gap filling once again... * keep track of the first metric writer * it should be an internal fatal - it does not harm users * do not check of future timestamps on collected pages, since we inherit the clock of the children; do not check collected pages validity without internal checks * prevent negative replication completion percentage * internal error for the discrepancy of mrg * better logging of dbengine new metrics collection * without internal checks it is unused * prevent pluginsd crash on exit due to calling pthread_cancel() on an exited thread * renames and atomics everywhere * if a datafile cannot be acquired for deletion during shutdown, continue - this can happen when there are hot pages in open cache referencing it * Debug for context load * rrdcontext uuid debug * rrddim uuid debug * rrdeng uuid debug * Revert "rrdeng uuid debug" This reverts commit 393da190826a582e7e6cc90771bf91b175826d8b. * Revert "rrddim uuid debug" This reverts commit 72150b30408294f141b19afcfb35abd7c34777d8. * Revert "rrdcontext uuid debug" This reverts commit 2c3b940dc23f460226e9b2a6861c214e840044d0. * Revert "Debug for context load" This reverts commit 0d880fc1589f128524e0b47abd9ff0714283ce3b. * do not use legacy uuids on multihost dbs * thread safety for journafile size * handle other cases of inconsistent collected pages * make health thread check if it should be running in key loops * do not log uuids Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'collectors')
-rw-r--r--collectors/plugins.d/plugins_d.c123
-rw-r--r--collectors/plugins.d/plugins_d.h13
-rw-r--r--collectors/plugins.d/pluginsd_parser.c11
3 files changed, 92 insertions, 55 deletions
diff --git a/collectors/plugins.d/plugins_d.c b/collectors/plugins.d/plugins_d.c
index e585d237ab..7608f3afcb 100644
--- a/collectors/plugins.d/plugins_d.c
+++ b/collectors/plugins.d/plugins_d.c
@@ -21,23 +21,54 @@ inline size_t pluginsd_initialize_plugin_directories()
return quoted_strings_splitter(plugins_dir_list, plugin_directories, PLUGINSD_MAX_DIRECTORIES, config_isspace, NULL, NULL, 0);
}
+static inline void plugin_set_disabled(struct plugind *cd) {
+ netdata_spinlock_lock(&cd->unsafe.spinlock);
+ cd->unsafe.enabled = false;
+ netdata_spinlock_unlock(&cd->unsafe.spinlock);
+}
+
+bool plugin_is_enabled(struct plugind *cd) {
+ netdata_spinlock_lock(&cd->unsafe.spinlock);
+ bool ret = cd->unsafe.enabled;
+ netdata_spinlock_unlock(&cd->unsafe.spinlock);
+ return ret;
+}
+
+static inline void plugin_set_running(struct plugind *cd) {
+ netdata_spinlock_lock(&cd->unsafe.spinlock);
+ cd->unsafe.running = true;
+ netdata_spinlock_unlock(&cd->unsafe.spinlock);
+}
+
+static inline bool plugin_is_running(struct plugind *cd) {
+ netdata_spinlock_lock(&cd->unsafe.spinlock);
+ bool ret = cd->unsafe.running;
+ netdata_spinlock_unlock(&cd->unsafe.spinlock);
+ return ret;
+}
+
static void pluginsd_worker_thread_cleanup(void *arg)
{
struct plugind *cd = (struct plugind *)arg;
- if (cd->enabled && !cd->obsolete) {
- cd->obsolete = 1;
+ netdata_spinlock_lock(&cd->unsafe.spinlock);
+
+ cd->unsafe.running = false;
+ cd->unsafe.thread = 0;
+ pid_t pid = cd->unsafe.pid;
+ cd->unsafe.pid = 0;
+
+ netdata_spinlock_unlock(&cd->unsafe.spinlock);
+
+ if (pid) {
info("data collection thread exiting");
- if (cd->pid) {
- siginfo_t info;
- info("killing child process pid %d", cd->pid);
- if (killpid(cd->pid) != -1) {
- info("waiting for child process pid %d to exit...", cd->pid);
- waitid(P_PID, (id_t)cd->pid, &info, WEXITED);
- }
- cd->pid = 0;
+ siginfo_t info;
+ info("killing child process pid %d", pid);
+ if (killpid(pid) != -1) {
+ info("waiting for child process pid %d to exit...", pid);
+ waitid(P_PID, (id_t)pid, &info, WEXITED);
}
}
}
@@ -53,8 +84,8 @@ static void pluginsd_worker_thread_handle_success(struct plugind *cd)
if (likely(cd->serial_failures <= SERIAL_FAILURES_THRESHOLD)) {
info(
"'%s' (pid %d) does not generate useful output but it reports success (exits with 0). %s.",
- cd->fullfilename, cd->pid,
- cd->enabled ? "Waiting a bit before starting it again." : "Will not start it again - it is now disabled.");
+ cd->fullfilename, cd->unsafe.pid,
+ plugin_is_enabled(cd) ? "Waiting a bit before starting it again." : "Will not start it again - it is now disabled.");
sleep((unsigned int)(cd->update_every * 10));
return;
}
@@ -63,35 +94,33 @@ static void pluginsd_worker_thread_handle_success(struct plugind *cd)
error(
"'%s' (pid %d) does not generate useful output, although it reports success (exits with 0)."
"We have tried to collect something %zu times - unsuccessfully. Disabling it.",
- cd->fullfilename, cd->pid, cd->serial_failures);
- cd->enabled = 0;
+ cd->fullfilename, cd->unsafe.pid, cd->serial_failures);
+ plugin_set_disabled(cd);
return;
}
-
- return;
}
static void pluginsd_worker_thread_handle_error(struct plugind *cd, int worker_ret_code)
{
if (worker_ret_code == -1) {
- info("'%s' (pid %d) was killed with SIGTERM. Disabling it.", cd->fullfilename, cd->pid);
- cd->enabled = 0;
+ info("'%s' (pid %d) was killed with SIGTERM. Disabling it.", cd->fullfilename, cd->unsafe.pid);
+ plugin_set_disabled(cd);
return;
}
if (!cd->successful_collections) {
error(
"'%s' (pid %d) exited with error code %d and haven't collected any data. Disabling it.", cd->fullfilename,
- cd->pid, worker_ret_code);
- cd->enabled = 0;
+ cd->unsafe.pid, worker_ret_code);
+ plugin_set_disabled(cd);
return;
}
if (cd->serial_failures <= SERIAL_FAILURES_THRESHOLD) {
error(
"'%s' (pid %d) exited with error code %d, but has given useful output in the past (%zu times). %s",
- cd->fullfilename, cd->pid, worker_ret_code, cd->successful_collections,
- cd->enabled ? "Waiting a bit before starting it again." : "Will not start it again - it is disabled.");
+ cd->fullfilename, cd->unsafe.pid, worker_ret_code, cd->successful_collections,
+ plugin_is_enabled(cd) ? "Waiting a bit before starting it again." : "Will not start it again - it is disabled.");
sleep((unsigned int)(cd->update_every * 10));
return;
}
@@ -100,48 +129,47 @@ static void pluginsd_worker_thread_handle_error(struct plugind *cd, int worker_r
error(
"'%s' (pid %d) exited with error code %d, but has given useful output in the past (%zu times)."
"We tried to restart it %zu times, but it failed to generate data. Disabling it.",
- cd->fullfilename, cd->pid, worker_ret_code, cd->successful_collections, cd->serial_failures);
- cd->enabled = 0;
+ cd->fullfilename, cd->unsafe.pid, worker_ret_code, cd->successful_collections, cd->serial_failures);
+ plugin_set_disabled(cd);
return;
}
-
- return;
}
+
#undef SERIAL_FAILURES_THRESHOLD
-void *pluginsd_worker_thread(void *arg)
+static void *pluginsd_worker_thread(void *arg)
{
worker_register("PLUGINSD");
netdata_thread_cleanup_push(pluginsd_worker_thread_cleanup, arg);
struct plugind *cd = (struct plugind *)arg;
+ plugin_set_running(cd);
- cd->obsolete = 0;
size_t count = 0;
while (service_running(SERVICE_COLLECTORS)) {
FILE *fp_child_input = NULL;
- FILE *fp_child_output = netdata_popen(cd->cmd, &cd->pid, &fp_child_input);
+ FILE *fp_child_output = netdata_popen(cd->cmd, &cd->unsafe.pid, &fp_child_input);
if (unlikely(!fp_child_input || !fp_child_output)) {
error("Cannot popen(\"%s\", \"r\").", cd->cmd);
break;
}
- info("connected to '%s' running on pid %d", cd->fullfilename, cd->pid);
+ info("connected to '%s' running on pid %d", cd->fullfilename, cd->unsafe.pid);
count = pluginsd_process(localhost, cd, fp_child_input, fp_child_output, 0);
- error("'%s' (pid %d) disconnected after %zu successful data collections (ENDs).", cd->fullfilename, cd->pid, count);
- killpid(cd->pid);
+ error("'%s' (pid %d) disconnected after %zu successful data collections (ENDs).", cd->fullfilename, cd->unsafe.pid, count);
+ killpid(cd->unsafe.pid);
- int worker_ret_code = netdata_pclose(fp_child_input, fp_child_output, cd->pid);
+ int worker_ret_code = netdata_pclose(fp_child_input, fp_child_output, cd->unsafe.pid);
if (likely(worker_ret_code == 0))
pluginsd_worker_thread_handle_success(cd);
else
pluginsd_worker_thread_handle_error(cd, worker_ret_code);
- cd->pid = 0;
- if (unlikely(!cd->enabled))
+ cd->unsafe.pid = 0;
+ if (unlikely(!plugin_is_enabled(cd)))
break;
}
worker_unregister();
@@ -158,10 +186,12 @@ static void pluginsd_main_cleanup(void *data)
struct plugind *cd;
for (cd = pluginsd_root; cd; cd = cd->next) {
- if (cd->enabled && !cd->obsolete) {
+ netdata_spinlock_lock(&cd->unsafe.spinlock);
+ if (cd->unsafe.enabled && cd->unsafe.running && cd->unsafe.thread != 0) {
info("stopping plugin thread: %s", cd->id);
- netdata_thread_cancel(cd->thread);
+ netdata_thread_cancel(cd->unsafe.thread);
}
+ netdata_spinlock_unlock(&cd->unsafe.spinlock);
}
info("cleanup completed.");
@@ -237,7 +267,7 @@ void *pluginsd_main(void *ptr)
if (unlikely(strcmp(cd->filename, file->d_name) == 0))
break;
- if (likely(cd && !cd->obsolete)) {
+ if (likely(cd && plugin_is_running(cd))) {
debug(D_PLUGINSD, "plugin '%s' is already running", cd->filename);
continue;
}
@@ -252,7 +282,9 @@ void *pluginsd_main(void *ptr)
strncpyz(cd->filename, file->d_name, FILENAME_MAX);
snprintfz(cd->fullfilename, FILENAME_MAX, "%s/%s", directory_name, cd->filename);
- cd->enabled = enabled;
+ cd->unsafe.enabled = enabled;
+ cd->unsafe.running = false;
+
cd->update_every = (int)config_get_number(cd->id, "update every", localhost->rrd_update_every);
cd->started_t = now_realtime_sec();
@@ -266,15 +298,16 @@ void *pluginsd_main(void *ptr)
cd->next = pluginsd_root;
pluginsd_root = cd;
- // it is not currently running
- cd->obsolete = 1;
-
- if (cd->enabled) {
+ if (plugin_is_enabled(cd)) {
char tag[NETDATA_THREAD_TAG_MAX + 1];
snprintfz(tag, NETDATA_THREAD_TAG_MAX, "PD[%s]", pluginname);
+
// spawn a new thread for it
- netdata_thread_create(
- &cd->thread, tag, NETDATA_THREAD_OPTION_DEFAULT, pluginsd_worker_thread, cd);
+ netdata_thread_create(&cd->unsafe.thread,
+ tag,
+ NETDATA_THREAD_OPTION_DEFAULT,
+ pluginsd_worker_thread,
+ cd);
}
}
}
diff --git a/collectors/plugins.d/plugins_d.h b/collectors/plugins.d/plugins_d.h
index a8acf038a4..35af9fe583 100644
--- a/collectors/plugins.d/plugins_d.h
+++ b/collectors/plugins.d/plugins_d.h
@@ -50,9 +50,6 @@ struct plugind {
char fullfilename[FILENAME_MAX+1]; // with path
char cmd[PLUGINSD_CMD_MAX+1]; // the command that it executes
- volatile pid_t pid;
- netdata_thread_t thread;
-
size_t successful_collections; // the number of times we have seen
// values collected from this plugin
@@ -60,8 +57,14 @@ struct plugind {
// without collecting values
int update_every; // the plugin default data collection frequency
- volatile sig_atomic_t obsolete; // do not touch this structure after setting this to 1
- volatile sig_atomic_t enabled; // if this is enabled or not
+
+ struct {
+ SPINLOCK spinlock;
+ bool running; // do not touch this structure after setting this to 1
+ bool enabled; // if this is enabled or not
+ netdata_thread_t thread;
+ pid_t pid;
+ } unsafe;
time_t started_t;
uint32_t capabilities; // follows the same principles as streaming capabilities
diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c
index 50bdc0b301..00c07e9db1 100644
--- a/collectors/plugins.d/pluginsd_parser.c
+++ b/collectors/plugins.d/pluginsd_parser.c
@@ -1239,7 +1239,8 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
time_t started = st->rrdhost->receiver->replication_first_time_t;
time_t current = ((PARSER_USER_OBJECT *) user)->replay.end_time;
- worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION,
+ if(started && current > started)
+ worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION,
(NETDATA_DOUBLE)(current - started) * 100.0 / (NETDATA_DOUBLE)(now - started));
}
@@ -1300,10 +1301,10 @@ static void pluginsd_process_thread_cleanup(void *ptr) {
inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugin_input, FILE *fp_plugin_output, int trust_durations)
{
- int enabled = cd->enabled;
+ int enabled = cd->unsafe.enabled;
if (!fp_plugin_input || !fp_plugin_output || !enabled) {
- cd->enabled = 0;
+ cd->unsafe.enabled = 0;
return 0;
}
@@ -1323,7 +1324,7 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi
clearerr(fp_plugin_output);
PARSER_USER_OBJECT user = {
- .enabled = cd->enabled,
+ .enabled = cd->unsafe.enabled,
.host = host,
.cd = cd,
.trust_durations = trust_durations
@@ -1348,7 +1349,7 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi
// free parser with the pop function
netdata_thread_cleanup_pop(1);
- cd->enabled = user.enabled;
+ cd->unsafe.enabled = user.enabled;
size_t count = user.count;
if (likely(count)) {