diff options
36 files changed, 1672 insertions, 871 deletions
diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c index 3e0f83422c..97d2dca984 100644 --- a/collectors/plugins.d/pluginsd_parser.c +++ b/collectors/plugins.d/pluginsd_parser.c @@ -13,20 +13,44 @@ static int send_to_plugin(const char *txt, void *data) { #ifdef ENABLE_HTTPS struct netdata_ssl *ssl = parser->ssl_output; if(ssl) { - if(ssl->conn && ssl->flags == NETDATA_SSL_HANDSHAKE_COMPLETE) { - size_t size = strlen(txt); - return SSL_write(ssl->conn, txt, (int)size); - } + if(ssl->conn && ssl->flags == NETDATA_SSL_HANDSHAKE_COMPLETE) + return (int)netdata_ssl_write(ssl->conn, (void *)txt, strlen(txt)); - error("cannot write to SSL connection - connection is not ready."); + error("PLUGINSD: cannot send command (SSL)"); return -1; } #endif - FILE *fp = parser->output; - int ret = fprintf(fp, "%s", txt); - fflush(fp); - return ret; + if(parser->fp_output) { + int bytes = fprintf(parser->fp_output, "%s", txt); + if(bytes <= 0) { + error("PLUGINSD: cannot send command (FILE)"); + return -2; + } + fflush(parser->fp_output); + return bytes; + } + + if(parser->fd != -1) { + size_t bytes = 0; + size_t total = strlen(txt); + ssize_t sent; + + do { + sent = write(parser->fd, &txt[bytes], total - bytes); + if(sent <= 0) { + error("PLUGINSD: cannot send command (fd)"); + return -3; + } + bytes += sent; + } + while(bytes < total); + + return (int)bytes; + } + + error("PLUGINSD: cannot send command (no output socket/pipe/file given to plugins.d parser)"); + return -4; } PARSER_RC pluginsd_set(char **words, size_t num_words, void *user) @@ -293,9 +317,25 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us // rrdhost_hostname(host), rrdset_id(st), // (unsigned long long)first_entry_child, (unsigned long long)last_entry_child); - rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); + bool ok = true; + if(!rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) { + +#ifdef NETDATA_INTERNAL_CHECKS + st->replay.start_streaming = false; + st->replay.after = 0; + st->replay.before = 0; +#endif + + rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); + rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS); + + ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st, first_entry_child, + last_entry_child, 0, 0); + } + else { + internal_error(true, "RRDSET: not sending duplicate replication request for chart '%s'", rrdset_id(st)); + } - bool ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st, first_entry_child, last_entry_child, 0, 0); return ok ? PARSER_RC_OK : PARSER_RC_ERROR; } @@ -425,7 +465,7 @@ static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void pf->sent_ut = now_realtime_usec(); if(ret < 0) { - error("FUNCTION: failed to send function to plugin, fprintf() returned error %d", ret); + error("FUNCTION: failed to send function to plugin, error %d", ret); rrd_call_function_error(pf->destination_wb, "Failed to communicate with collector", HTTP_RESP_BACKEND_FETCH_FAILED); } else { @@ -847,42 +887,54 @@ PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *use char *id = get_word(words, num_words, 1); char *start_time_str = get_word(words, num_words, 2); char *end_time_str = get_word(words, num_words, 3); + char *child_now_str = get_word(words, num_words, 4); RRDSET *st = ((PARSER_USER_OBJECT *) user)->st; RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host; if (unlikely(!id || (!st && !*id))) { - error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_BEGIN " without a chart id for host '%s'. Disabling it.", rrdhost_hostname(host)); + error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " without a chart id for host '%s'. Disabling it.", rrdhost_hostname(host)); goto disable; } if(*id) { st = rrdset_find(host, id); if (unlikely(!st)) { - error("requested a " PLUGINSD_KEYWORD_REPLAY_BEGIN " on chart '%s', which does not exist on host '%s'. Disabling it.", + error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " on chart '%s', which does not exist on host '%s'. Disabling it.", id, rrdhost_hostname(host)); goto disable; } ((PARSER_USER_OBJECT *) user)->st = st; - ((PARSER_USER_OBJECT *) user)->replay.start_time = 0; - ((PARSER_USER_OBJECT *) user)->replay.end_time = 0; - ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0; - ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0; + } + + if(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE) && !rrdset_flag_check(st, RRDSET_FLAG_ARCHIVED)) { + error("REPLAY: chart '%s' on host '%s' has the OBSOLETE flag set, but it is collected.", rrdset_id(st), rrdhost_hostname(host)); + rrdset_isnot_obsolete(st); } if(start_time_str && end_time_str) { time_t start_time = strtol(start_time_str, NULL, 0); time_t end_time = strtol(end_time_str, NULL, 0); - if(start_time && end_time) { - if (start_time > end_time) { - error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_BEGIN " on chart '%s' ('%s') on host '%s', but timings are invalid (%ld to %ld). Disabling it.", - rrdset_name(st), rrdset_id(st), rrdhost_hostname(st->rrdhost), start_time, end_time); - goto disable; - } + time_t wall_clock_time = 0, tolerance; + if(child_now_str) { + wall_clock_time = strtol(child_now_str, NULL, 0); + tolerance = 1; + } + + if(wall_clock_time <= 0) { + wall_clock_time = now_realtime_sec(); + tolerance = st->update_every + 60; + } + + internal_error( + (!st->replay.start_streaming && (end_time < st->replay.after || start_time > st->replay.before)), + "REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " on chart '%s' ('%s') on host '%s', from %ld to %ld, which does not match our request (%ld to %ld).", + rrdset_name(st), rrdset_id(st), rrdhost_hostname(st->rrdhost), start_time, end_time, st->replay.after, st->replay.before); - if (end_time - start_time != st->update_every) + if(start_time && end_time && start_time < wall_clock_time + tolerance && end_time < wall_clock_time + tolerance && start_time < end_time) { + if (unlikely(end_time - start_time != st->update_every)) rrdset_set_update_every(st, end_time - start_time); st->last_collected_time.tv_sec = end_time; @@ -891,11 +943,6 @@ PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *use st->last_updated.tv_sec = end_time; st->last_updated.tv_usec = 0; - ((PARSER_USER_OBJECT *) user)->replay.start_time = start_time; - ((PARSER_USER_OBJECT *) user)->replay.end_time = end_time; - ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = (usec_t) start_time * USEC_PER_SEC; - ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = (usec_t) end_time * USEC_PER_SEC; - st->counter++; st->counter_done++; @@ -903,9 +950,31 @@ PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *use st->current_entry++; if(st->current_entry >= st->entries) st->current_entry -= st->entries; + + ((PARSER_USER_OBJECT *) user)->replay.start_time = start_time; + ((PARSER_USER_OBJECT *) user)->replay.end_time = end_time; + ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = (usec_t) start_time * USEC_PER_SEC; + ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = (usec_t) end_time * USEC_PER_SEC; + ((PARSER_USER_OBJECT *) user)->replay.wall_clock_time = wall_clock_time; + ((PARSER_USER_OBJECT *) user)->replay.rset_enabled = true; + + return PARSER_RC_OK; } + + internal_error(true, + "REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " on chart '%s' ('%s') on host '%s', from %ld to %ld, but timestamps are invalid (now is %ld).", + rrdset_name(st), rrdset_id(st), rrdhost_hostname(st->rrdhost), start_time, end_time, wall_clock_time); } + // the child sends an RBEGIN without any parameters initially + // setting rset_enabled to false, means the RSET should not store any metrics + // to store metrics, the RBEGIN needs to have timestamps + ((PARSER_USER_OBJECT *) user)->replay.start_time = 0; + ((PARSER_USER_OBJECT *) user)->replay.end_time = 0; + ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0; + ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0; + ((PARSER_USER_OBJECT *) user)->replay.wall_clock_time = 0; + ((PARSER_USER_OBJECT *) user)->replay.rset_enabled = false; return PARSER_RC_OK; disable: @@ -915,6 +984,9 @@ disable: PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user) { + if(!((PARSER_USER_OBJECT *) user)->replay.rset_enabled) + return PARSER_RC_OK; + char *dimension = get_word(words, num_words, 1); char *value_str = get_word(words, num_words, 2); char *flags_str = get_word(words, num_words, 3); @@ -923,20 +995,22 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user) RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host; if (unlikely(!st)) { - error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_SET " on dimension '%s' on host '%s', without a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.", + error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_SET " on dimension '%s' on host '%s', without a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.", dimension, rrdhost_hostname(host)); goto disable; } if (unlikely(!dimension || !*dimension)) { - error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_SET " on chart '%s' of host '%s', without a dimension. Disabling it.", + error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_SET " on chart '%s' of host '%s', without a dimension. Disabling it.", rrdset_id(st), rrdhost_hostname(host)); goto disable; } if (unlikely(!((PARSER_USER_OBJECT *) user)->replay.start_time || !((PARSER_USER_OBJECT *) user)->replay.end_time)) { - error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_SET " on dimension '%s' on host '%s', without timings from a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.", - dimension, rrdhost_hostname(host)); + error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_SET " on dimension '%s' on host '%s', with invalid timestamps %ld to %ld from a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.", + dimension, rrdhost_hostname(host), + ((PARSER_USER_OBJECT *) user)->replay.start_time, + ((PARSER_USER_OBJECT *) user)->replay.end_time); goto disable; } @@ -946,14 +1020,11 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user) if(unlikely(!flags_str)) flags_str = ""; - if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) - debug(D_PLUGINSD, "REPLAY: is replaying dimension '%s'/'%s' to '%s'", rrdset_id(st), dimension, value_str); - if (likely(value_str)) { RRDDIM_ACQUIRED *rda = rrddim_find_and_acquire(st, dimension); RRDDIM *rd = rrddim_acquired_to_rrddim(rda); if(unlikely(!rd)) { - error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_SET " to dimension with id '%s' on chart '%s' ('%s') on host '%s', which does not exist. Disabling it.", + error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_SET " to dimension '%s' on chart '%s' ('%s') on host '%s', which does not exist. Disabling it.", dimension, rrdset_name(st), rrdset_id(st), rrdhost_hostname(st->rrdhost)); goto disable; } @@ -961,7 +1032,7 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user) RRDDIM_FLAGS rd_flags = rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE | RRDDIM_FLAG_ARCHIVED); if(unlikely(rd_flags & RRDDIM_FLAG_OBSOLETE)) { - error("Dimension %s in chart '%s' has the OBSOLETE flag set, but it is collected.", rrddim_name(rd), rrdset_id(st)); + error("REPLAY: dimension '%s' in chart '%s' has the OBSOLETE flag set, but it is collected.", rrddim_name(rd), rrdset_id(st)); rrddim_isnot_obsolete(st, rd); } @@ -998,7 +1069,7 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user) rd->collections_counter++; } else - error("Dimension %s in chart '%s' has the ARCHIVED flag set, but it is collected. Ignoring data.", rrddim_name(rd), rrdset_id(st)); + error("REPLAY: dimension '%s' in chart '%s' has the ARCHIVED flag set, but it is collected. Ignoring data.", rrddim_name(rd), rrdset_id(st)); rrddim_acquired_release(rda); } @@ -1021,13 +1092,13 @@ PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host; if (unlikely(!st)) { - error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " on dimension '%s' on host '%s', without a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.", + error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " on dimension '%s' on host '%s', without a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.", dimension, rrdhost_hostname(host)); goto disable; } if (unlikely(!dimension || !*dimension)) { - error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " on chart '%s' of host '%s', without a dimension. Disabling it.", + error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " on chart '%s' of host '%s', without a dimension. Disabling it.", rrdset_id(st), rrdhost_hostname(host)); goto disable; } @@ -1035,7 +1106,7 @@ PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words RRDDIM_ACQUIRED *rda = rrddim_find_and_acquire(st, dimension); RRDDIM *rd = rrddim_acquired_to_rrddim(rda); if(unlikely(!rd)) { - error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " to dimension with id '%s' on chart '%s' ('%s') on host '%s', which does not exist. Disabling it.", + error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " to dimension with id '%s' on chart '%s' ('%s') on host '%s', which does not exist. Disabling it.", dimension, rrdset_name(st), rrdset_id(st), rrdhost_hostname(st->rrdhost)); goto disable; } @@ -1067,7 +1138,7 @@ PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host; if (unlikely(!st)) { - error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " on host '%s', without a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.", + error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " on host '%s', without a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.", rrdhost_hostname(host)); goto disable; } @@ -1117,7 +1188,7 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user) RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host; if (unlikely(!st)) { - error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_END " on host '%s', without a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.", + error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_END " on host '%s', without a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.", rrdhost_hostname(host)); return PARSER_RC_ERROR; } @@ -1132,15 +1203,41 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user) ((PARSER_USER_OBJECT *) user)->st = NULL; ((PARSER_USER_OBJECT *) user)->count++; + if(((PARSER_USER_OBJECT *) user)->replay.rset_enabled && st->rrdhost->receiver) { + time_t now = now_realtime_sec(); + time_t started = st->rrdhost->receiver->replication_first_time_t; + time_t current = ((PARSER_USER_OBJECT *) user)->replay.end_time; + + worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, + (NETDATA_DOUBLE)(current - started) * 100.0 / (NETDATA_DOUBLE)(now - started)); + } + + ((PARSER_USER_OBJECT *) user)->replay.start_time = 0; + ((PARSER_USER_OBJECT *) user)->replay.end_time = 0; + ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0; + ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0; + ((PARSER_USER_OBJECT *) user)->replay.wall_clock_time = 0; + ((PARSER_USER_OBJECT *) user)->replay.rset_enabled = false; + st->counter++; st->counter_done++; +#ifdef NETDATA_INTERNAL_CHECKS + st->replay.start_streaming = false; + st->replay.after = 0; + st->replay.before = 0; +#endif + if (start_streaming) { if (st->update_every != update_every_child) rrdset_set_update_every(st, update_every_child); rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); + rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS); rrdset_flag_clear(st, RRDSET_FLAG_SYNC_CLOCK); + + worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, 100.0); + return PARSER_RC_OK; } @@ -1191,7 +1288,7 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi }; // fp_plugin_output = our input; fp_plugin_input = our output - PARSER *parser = parser_init(host, &user, fp_plugin_output, fp_plugin_input, PARSER_INPUT_SPLIT, NULL); + PARSER *parser = parser_init(host, &user, fp_plugin_output, fp_plugin_input, -1, PARSER_INPUT_SPLIT, NULL); rrd_collector_started(); diff --git a/collectors/plugins.d/pluginsd_parser.h b/collectors/plugins.d/pluginsd_parser.h index a76ab211fe..e18b43e580 100644 --- a/collectors/plugins.d/pluginsd_parser.h +++ b/collectors/plugins.d/pluginsd_parser.h @@ -26,6 +26,10 @@ typedef struct parser_user_object { usec_t start_time_ut; usec_t end_time_ut; + + time_t wall_clock_time; + + bool rset_enabled; } replay; } PARSER_USER_OBJECT; diff --git a/daemon/global_statistics.c b/daemon/global_statistics.c index e81fc49d11..2cdf1f71f3 100644 --- a/daemon/global_statistics.c +++ b/daemon/global_statistics.c @@ -1848,6 +1848,7 @@ static struct worker_utilization all_workers_utilization[] = { { .name = "TIMEX", .family = "workers plugin timex", .priority = 1000000 }, { .name = "IDLEJITTER", .family = "workers plugin idlejitter", .priority = 1000000 }, { .name = "RRDCONTEXT", .family = "workers contexts", .priority = 1000000 }, + { .name = "REPLICATION", .family = "workers replication sender", .priority = 1000000 }, { .name = "SERVICE", .family = "workers service", .priority = 1000000 }, // has to be terminated with a NULL @@ -2203,7 +2204,7 @@ static void workers_utilization_update_chart(struct worker_utilization *wu) { { size_t i; for (i = 0; i < WORKER_UTILIZATION_MAX_JOB_TYPES; i++) { - if(wu->per_job_type[i].type != WORKER_METRIC_INCREMENTAL) + if(wu->per_job_type[i].type != WORKER_METRIC_INCREMENT && wu->per_job_type[i].type != WORKER_METRIC_INCREMENTAL_TOTAL) continue; if(!wu->per_job_type[i].count_value) diff --git a/daemon/main.c b/daemon/main.c index 5c437d208d..67d24d6977 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -4,6 +4,7 @@ #include "buildinfo.h" #include "static_threads.h" +bool unittest_running = false; int netdata_zero_metrics_enabled; int netdata_anonymous_statistics_enabled; @@ -678,7 +679,7 @@ static void get_netdata_configured_variables() { // ------------------------------------------------------------------------ // get default Database Engine page cache size in MiB - db_engine_use_malloc = config_get_boolean(CONFIG_SECTION_DB, "dbengine page cache with malloc", CONFIG_BOOLEAN_NO); + db_engine_use_malloc = config_get_boolean(CONFIG_SECTION_DB, "dbengine page cache with malloc", CONFIG_BOOLEAN_YES); default_rrdeng_page_cache_mb = (int) config_get_number(CONFIG_SECTION_DB, "dbengine page cache size MB", default_rrdeng_page_cache_mb); if(default_rrdeng_page_cache_mb < RRDENG_MIN_PAGE_CACHE_SIZE_MB) { error("Invalid page cache size %d given. Defaulting to %d.", default_rrdeng_page_cache_mb, RRDENG_MIN_PAGE_CACHE_SIZE_MB); @@ -982,6 +983,8 @@ int main(int argc, char **argv) { } if(strcmp(optarg, "unittest") == 0) { + unittest_running = true; + if (unit_test_static_threads()) return 1; if (unit_test_buffer()) @@ -1028,24 +1031,31 @@ int main(int argc, char **argv) { #endif #ifdef ENABLE_DBENGINE else if(strcmp(optarg, "mctest") == 0) { + unittest_running = true; return mc_unittest(); } else if(strcmp(optarg, "ctxtest") == 0) { + unittest_running = true; return ctx_unittest(); } else if(strcmp(optarg, "dicttest") == 0) { + unittest_running = true; return dictionary_unittest(10000); } else if(strcmp(optarg, "araltest") == 0) { + unittest_running = true; return aral_unittest(10000); } else if(strcmp(optarg, "stringtest") == 0) { + unittest_running = true; return string_unittest(10000); } else if(strcmp(optarg, "rrdlabelstest") == 0) { + unittest_running = true; return rrdlabels_unittest(); } else if(strcmp(optarg, "metatest") == 0) { + unittest_running = true; return metadata_unittest(); } else if(strncmp(optarg, createdataset_string, strlen(createdataset_string)) == 0) { diff --git a/daemon/service.c b/daemon/service.c index be1aad8164..28fdc84756 100644 --- a/daemon/service.c +++ b/daemon/service.c @@ -156,17 +156,20 @@ static void svc_rrdhost_cleanup_obsolete_charts(RRDHOST *host) { static void svc_rrdset_check_obsoletion(RRDHOST *host) { worker_is_busy(WORKER_JOB_CHILD_CHART_OBSOLETION_CHECK); + time_t now = now_realtime_sec(); time_t last_entry_t; RRDSET *st; rrdset_foreach_read(st, host) { + if(!rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED)) + continue; + last_entry_t = rrdset_last_entry_t(st); - if(last_entry_t && last_entry_t < host->senders_connect_time && host->senders_connect_time - + TIME_TO_RUN_OBSOLETIONS_ON_CHILD_CONNECT + ITERATIONS_TO_RUN_OBSOLETIONS_ON_CHILD_CONNECT * st->update_every - < now_realtime_sec()) + if(last_entry_t && last_entry_t < host->senders_connect_time && + host->senders_connect_time + TIME_TO_RUN_OBSOLETIONS_ON_CHILD_CONNECT + ITERATIONS_TO_RUN_OBSOLETIONS_ON_CHILD_CONNECT * st->update_every + < now) rrdset_is_obsolete(st); - } rrdset_foreach_done(st); } diff --git a/daemon/static_threads.c b/daemon/static_threads.c index 707866ee68..9d2f30c355 100644 --- a/daemon/static_threads.c +++ b/daemon/static_threads.c @@ -2,16 +2,17 @@ #include "common.h" -extern void *aclk_main(void *ptr); -extern void *analytics_main(void *ptr); -extern void *checks_main(void *ptr); -extern void *cpuidlejitter_main(void *ptr); -extern void *global_statistics_main(void *ptr); -extern void *health_main(void *ptr); -extern void *pluginsd_main(void *ptr); -extern void *service_main(void *ptr); -extern void *statsd_main(void *ptr); -extern void *timex_main(void *ptr); +void *aclk_main(void *ptr); +void *analytics_main(void *ptr); +void *checks_main(void *ptr); +void *cpuidlejitter_main(void *ptr); +void *global_statistics_main(void *ptr); +void *health_main(void *ptr); +void *pluginsd_main(void *ptr); +void *service_main(void *ptr); +void *statsd_main(void *ptr); +void *timex_main(void *ptr); +void *replication_thread_main(void *ptr __maybe_unused); extern bool global_statistics_enabled; @@ -140,6 +141,16 @@ const struct netdata_static_thread static_threads_common[] = { .start_routine = rrdcontext_main }, + { + .name = "replication", + .config_section = NULL, + .config_name = NULL, + .enabled = 1, + .thread = NULL, + .init_routine = NULL, + .start_routine = replication_thread_main + }, + // terminator { .name = NULL, diff --git a/database/rrd.h b/database/rrd.h index 0069715b90..b548aa7169 100644 --- a/database/rrd.h +++ b/database/rrd.h @@ -55,6 +55,7 @@ struct pg_cache_page_index; #include "sqlite/sqlite_health.h" #include "rrdcontext.h" +extern bool unittest_running; extern bool dbengine_enabled; extern size_t storage_tiers; extern size_t storage_tiers_grouping_iterations[RRD_STORAGE_TIERS]; @@ -533,8 +534,9 @@ typedef enum rrdset_flags { RRDSET_FLAG_SENDER_REPLICATION_FINISHED = (1 << 22), // the sending side has completed replication RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED = (1 << 23), // the receiving side has completed replication + RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS = (1 << 24), // the receiving side has replication in progress - RRDSET_FLAG_UPSTREAM_SEND_VARIABLES = (1 << 24), // a custom variable has been updated and needs to be exposed to parent + RRDSET_FLAG_UPSTREAM_SEND_VARIABLES = (1 << 25), // a custom variable has been updated and needs to be exposed to parent } RRDSET_FLAGS; #define rrdset_flag_check(st, flag) (__atomic_load_n(&((st)->flags), __ATOMIC_SEQ_CST) & (flag)) @@ -658,6 +660,14 @@ struct rrdset { netdata_rwlock_t rwlock; // protection for RRDCALC *base RRDCALC *base; // double linked list of RRDCALC related to this RRDSET } alerts; + +#ifdef NETDATA_INTERNAL_CHECKS + struct { + bool start_streaming; + time_t after; + time_t before; + } replay; +#endif }; #define rrdset_plugin_name(st) string2str((st)->plugin_name) @@ -757,6 +767,8 @@ typedef enum { // Configuration options RRDHOST_OPTION_DELETE_OBSOLETE_CHARTS = (1 << 3), // delete files of obsolete charts RRDHOST_OPTION_DELETE_ORPHAN_HOST = (1 << 4), // delete the entire host when orphan + + RRDHOST_OPTION_REPLICATION |