diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2022-11-20 23:47:53 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-11-20 23:47:53 +0200 |
commit | 284f6f3aa4f36cefad2601c490510621496c2b53 (patch) | |
tree | 97a7d55627ef7477f431c53a20d0e6f1f738a419 /collectors | |
parent | 2d02484954f68bf7e3015cb649e2f10a9f3c5c95 (diff) |
streaming compression, query planner and replication fixes (#14023)
* streaming compression, query planner and replication fixes
* remove journal v2 stats from global statistics
* disable sql for checking past sql UUIDs
* single threaded replication
* final replication thread using dictionaries and JudyL for sorting the pending requests
* do not timeout the sending socket when there are pending replication requests
* streaming receiver using read() instead of fread()
* remove FILE * from streaming - now using posix read() and write()
* increase timeouts to 10 minutes
* apply sender timeout only when there are metrics that are supposed to be streamed
* error handling in replication
* remove retries on socket read timeout; better error messages
* take into account inbound traffic too to detect that a connection is stale
* remove race conditions from replication thread
* make sure deleted entries are marked as executed, so that even if deletion fails, they will not be executed
* 2 minutes timeout to retry streaming to a parent that already has this node
* remove unecessary condition check
* fix compilation warnings
* include judy in replication
* wrappers to handle retries for SSL_read and SSL_write
* compressed bytes read monitoring
* recursive locks on replication to make it faster during flush or cleanup
* replication completion chart at the receiver side
* simplified recursive mutex
* simplified recursive mutex again
Diffstat (limited to 'collectors')
-rw-r--r-- | collectors/plugins.d/pluginsd_parser.c | 189 | ||||
-rw-r--r-- | collectors/plugins.d/pluginsd_parser.h | 4 |
2 files changed, 147 insertions, 46 deletions
diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c index 3e0f83422c..97d2dca984 100644 --- a/collectors/plugins.d/pluginsd_parser.c +++ b/collectors/plugins.d/pluginsd_parser.c @@ -13,20 +13,44 @@ static int send_to_plugin(const char *txt, void *data) { #ifdef ENABLE_HTTPS struct netdata_ssl *ssl = parser->ssl_output; if(ssl) { - if(ssl->conn && ssl->flags == NETDATA_SSL_HANDSHAKE_COMPLETE) { - size_t size = strlen(txt); - return SSL_write(ssl->conn, txt, (int)size); - } + if(ssl->conn && ssl->flags == NETDATA_SSL_HANDSHAKE_COMPLETE) + return (int)netdata_ssl_write(ssl->conn, (void *)txt, strlen(txt)); - error("cannot write to SSL connection - connection is not ready."); + error("PLUGINSD: cannot send command (SSL)"); return -1; } #endif - FILE *fp = parser->output; - int ret = fprintf(fp, "%s", txt); - fflush(fp); - return ret; + if(parser->fp_output) { + int bytes = fprintf(parser->fp_output, "%s", txt); + if(bytes <= 0) { + error("PLUGINSD: cannot send command (FILE)"); + return -2; + } + fflush(parser->fp_output); + return bytes; + } + + if(parser->fd != -1) { + size_t bytes = 0; + size_t total = strlen(txt); + ssize_t sent; + + do { + sent = write(parser->fd, &txt[bytes], total - bytes); + if(sent <= 0) { + error("PLUGINSD: cannot send command (fd)"); + return -3; + } + bytes += sent; + } + while(bytes < total); + + return (int)bytes; + } + + error("PLUGINSD: cannot send command (no output socket/pipe/file given to plugins.d parser)"); + return -4; } PARSER_RC pluginsd_set(char **words, size_t num_words, void *user) @@ -293,9 +317,25 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us // rrdhost_hostname(host), rrdset_id(st), // (unsigned long long)first_entry_child, (unsigned long long)last_entry_child); - rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); + bool ok = true; + if(!rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) { + +#ifdef NETDATA_INTERNAL_CHECKS + st->replay.start_streaming = false; + st->replay.after = 0; + st->replay.before = 0; +#endif + + rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); + rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS); + + ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st, first_entry_child, + last_entry_child, 0, 0); + } + else { + internal_error(true, "RRDSET: not sending duplicate replication request for chart '%s'", rrdset_id(st)); + } - bool ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st, first_entry_child, last_entry_child, 0, 0); return ok ? PARSER_RC_OK : PARSER_RC_ERROR; } @@ -425,7 +465,7 @@ static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void pf->sent_ut = now_realtime_usec(); if(ret < 0) { - error("FUNCTION: failed to send function to plugin, fprintf() returned error %d", ret); + error("FUNCTION: failed to send function to plugin, error %d", ret); rrd_call_function_error(pf->destination_wb, "Failed to communicate with collector", HTTP_RESP_BACKEND_FETCH_FAILED); } else { @@ -847,42 +887,54 @@ PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *use char *id = get_word(words, num_words, 1); char *start_time_str = get_word(words, num_words, 2); char *end_time_str = get_word(words, num_words, 3); + char *child_now_str = get_word(words, num_words, 4); RRDSET *st = ((PARSER_USER_OBJECT *) user)->st; RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host; if (unlikely(!id || (!st && !*id))) { - error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_BEGIN " without a chart id for host '%s'. Disabling it.", rrdhost_hostname(host)); + error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " without a chart id for host '%s'. Disabling it.", rrdhost_hostname(host)); goto disable; } if(*id) { st = rrdset_find(host, id); if (unlikely(!st)) { - error("requested a " PLUGINSD_KEYWORD_REPLAY_BEGIN " on chart '%s', which does not exist on host '%s'. Disabling it.", + error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " on chart '%s', which does not exist on host '%s'. Disabling it.", id, rrdhost_hostname(host)); goto disable; } ((PARSER_USER_OBJECT *) user)->st = st; - ((PARSER_USER_OBJECT *) user)->replay.start_time = 0; - ((PARSER_USER_OBJECT *) user)->replay.end_time = 0; - ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0; - ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0; + } + + if(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE) && !rrdset_flag_check(st, RRDSET_FLAG_ARCHIVED)) { + error("REPLAY: chart '%s' on host '%s' has the OBSOLETE flag set, but it is collected.", rrdset_id(st), rrdhost_hostname(host)); + rrdset_isnot_obsolete(st); } if(start_time_str && end_time_str) { time_t start_time = strtol(start_time_str, NULL, 0); time_t end_time = strtol(end_time_str, NULL, 0); - if(start_time && end_time) { - if (start_time > end_time) { - error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_BEGIN " on chart '%s' ('%s') on host '%s', but timings are invalid (%ld to %ld). Disabling it.", - rrdset_name(st), rrdset_id(st), rrdhost_hostname(st->rrdhost), start_time, end_time); - goto disable; - } + time_t wall_clock_time = 0, tolerance; + if(child_now_str) { + wall_clock_time = strtol(child_now_str, NULL, 0); + tolerance = 1; + } + + if(wall_clock_time <= 0) { + wall_clock_time = now_realtime_sec(); + tolerance = st->update_every + 60; + } + + internal_error( + (!st->replay.start_streaming && (end_time < st->replay.after || start_time > st->replay.before)), + "REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " on chart '%s' ('%s') on host '%s', from %ld to %ld, which does not match our request (%ld to %ld).", + rrdset_name(st), rrdset_id(st), rrdhost_hostname(st->rrdhost), start_time, end_time, st->replay.after, st->replay.before); - if (end_time - start_time != st->update_every) + if(start_time && end_time && start_time < wall_clock_time + tolerance && end_time < wall_clock_time + tolerance && start_time < end_time) { + if (unlikely(end_time - start_time != st->update_every)) rrdset_set_update_every(st, end_time - start_time); st->last_collected_time.tv_sec = end_time; @@ -891,11 +943,6 @@ PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *use st->last_updated.tv_sec = end_time; st->last_updated.tv_usec = 0; - ((PARSER_USER_OBJECT *) user)->replay.start_time = start_time; - ((PARSER_USER_OBJECT *) user)->replay.end_time = end_time; - ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = (usec_t) start_time * USEC_PER_SEC; - ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = (usec_t) end_time * USEC_PER_SEC; - st->counter++; st->counter_done++; @@ -903,9 +950,31 @@ PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *use st->current_entry++; if(st->current_entry >= st->entries) st->current_entry -= st->entries; + + ((PARSER_USER_OBJECT *) user)->replay.start_time = start_time; + ((PARSER_USER_OBJECT *) user)->replay.end_time = end_time; + ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = (usec_t) start_time * USEC_PER_SEC; + ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = (usec_t) end_time * USEC_PER_SEC; + ((PARSER_USER_OBJECT *) user)->replay.wall_clock_time = wall_clock_time; + ((PARSER_USER_OBJECT *) user)->replay.rset_enabled = true; + + return PARSER_RC_OK; } + + internal_error(true, + "REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " on chart '%s' ('%s') on host '%s', from %ld to %ld, but timestamps are invalid (now is %ld).", + rrdset_name(st), rrdset_id(st), rrdhost_hostname(st->rrdhost), start_time, end_time, wall_clock_time); } + // the child sends an RBEGIN without any parameters initially + // setting rset_enabled to false, means the RSET should not store any metrics + // to store metrics, the RBEGIN needs to have timestamps + ((PARSER_USER_OBJECT *) user)->replay.start_time = 0; + ((PARSER_USER_OBJECT *) user)->replay.end_time = 0; + ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0; + ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0; + ((PARSER_USER_OBJECT *) user)->replay.wall_clock_time = 0; + ((PARSER_USER_OBJECT *) user)->replay.rset_enabled = false; return PARSER_RC_OK; disable: @@ -915,6 +984,9 @@ disable: PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user) { + if(!((PARSER_USER_OBJECT *) user)->replay.rset_enabled) + return PARSER_RC_OK; + char *dimension = get_word(words, num_words, 1); char *value_str = get_word(words, num_words, 2); char *flags_str = get_word(words, num_words, 3); @@ -923,20 +995,22 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user) RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host; if (unlikely(!st)) { - error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_SET " on dimension '%s' on host '%s', without a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.", + error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_SET " on dimension '%s' on host '%s', without a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.", dimension, rrdhost_hostname(host)); goto disable; } if (unlikely(!dimension || !*dimension)) { - error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_SET " on chart '%s' of host '%s', without a dimension. Disabling it.", + error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_SET " on chart '%s' of host '%s', without a dimension. Disabling it.", rrdset_id(st), rrdhost_hostname(host)); goto disable; } if (unlikely(!((PARSER_USER_OBJECT *) user)->replay.start_time || !((PARSER_USER_OBJECT *) user)->replay.end_time)) { - error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_SET " on dimension '%s' on host '%s', without timings from a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.", - dimension, rrdhost_hostname(host)); + error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_SET " on dimension '%s' on host '%s', with invalid timestamps %ld to %ld from a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.", + dimension, rrdhost_hostname(host), + ((PARSER_USER_OBJECT *) user)->replay.start_time, + ((PARSER_USER_OBJECT *) user)->replay.end_time); goto disable; } @@ -946,14 +1020,11 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user) if(unlikely(!flags_str)) flags_str = ""; - if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) - debug(D_PLUGINSD, "REPLAY: is replaying dimension '%s'/'%s' to '%s'", rrdset_id(st), dimension, value_str); - if (likely(value_str)) { RRDDIM_ACQUIRED *rda = rrddim_find_and_acquire(st, dimension); RRDDIM *rd = rrddim_acquired_to_rrddim(rda); if(unlikely(!rd)) { - error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_SET " to dimension with id '%s' on chart '%s' ('%s') on host '%s', which does not exist. Disabling it.", + error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_SET " to dimension '%s' on chart '%s' ('%s') on host '%s', which does not exist. Disabling it.", dimension, rrdset_name(st), rrdset_id(st), rrdhost_hostname(st->rrdhost)); goto disable; } @@ -961,7 +1032,7 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user) RRDDIM_FLAGS rd_flags = rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE | RRDDIM_FLAG_ARCHIVED); if(unlikely(rd_flags & RRDDIM_FLAG_OBSOLETE)) { - error("Dimension %s in chart '%s' has the OBSOLETE flag set, but it is collected.", rrddim_name(rd), rrdset_id(st)); + error("REPLAY: dimension '%s' in chart '%s' has the OBSOLETE flag set, but it is collected.", rrddim_name(rd), rrdset_id(st)); rrddim_isnot_obsolete(st, rd); } @@ -998,7 +1069,7 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user) rd->collections_counter++; } else - error("Dimension %s in chart '%s' has the ARCHIVED flag set, but it is collected. Ignoring data.", rrddim_name(rd), rrdset_id(st)); + error("REPLAY: dimension '%s' in chart '%s' has the ARCHIVED flag set, but it is collected. Ignoring data.", rrddim_name(rd), rrdset_id(st)); rrddim_acquired_release(rda); } @@ -1021,13 +1092,13 @@ PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host; if (unlikely(!st)) { - error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " on dimension '%s' on host '%s', without a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.", + error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " on dimension '%s' on host '%s', without a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.", dimension, rrdhost_hostname(host)); goto disable; } if (unlikely(!dimension || !*dimension)) { - error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " on chart '%s' of host '%s', without a dimension. Disabling it.", + error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " on chart '%s' of host '%s', without a dimension. Disabling it.", rrdset_id(st), rrdhost_hostname(host)); goto disable; } @@ -1035,7 +1106,7 @@ PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words RRDDIM_ACQUIRED *rda = rrddim_find_and_acquire(st, dimension); RRDDIM *rd = rrddim_acquired_to_rrddim(rda); if(unlikely(!rd)) { - error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " to dimension with id '%s' on chart '%s' ('%s') on host '%s', which does not exist. Disabling it.", + error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " to dimension with id '%s' on chart '%s' ('%s') on host '%s', which does not exist. Disabling it.", dimension, rrdset_name(st), rrdset_id(st), rrdhost_hostname(st->rrdhost)); goto disable; } @@ -1067,7 +1138,7 @@ PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host; if (unlikely(!st)) { - error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " on host '%s', without a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.", + error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " on host '%s', without a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.", rrdhost_hostname(host)); goto disable; } @@ -1117,7 +1188,7 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user) RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host; if (unlikely(!st)) { - error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_END " on host '%s', without a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.", + error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_END " on host '%s', without a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.", rrdhost_hostname(host)); return PARSER_RC_ERROR; } @@ -1132,15 +1203,41 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user) ((PARSER_USER_OBJECT *) user)->st = NULL; ((PARSER_USER_OBJECT *) user)->count++; + if(((PARSER_USER_OBJECT *) user)->replay.rset_enabled && st->rrdhost->receiver) { + time_t now = now_realtime_sec(); + time_t started = st->rrdhost->receiver->replication_first_time_t; + time_t current = ((PARSER_USER_OBJECT *) user)->replay.end_time; + + worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, + (NETDATA_DOUBLE)(current - started) * 100.0 / (NETDATA_DOUBLE)(now - started)); + } + + ((PARSER_USER_OBJECT *) user)->replay.start_time = 0; + ((PARSER_USER_OBJECT *) user)->replay.end_time = 0; + ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0; + ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0; + ((PARSER_USER_OBJECT *) user)->replay.wall_clock_time = 0; + ((PARSER_USER_OBJECT *) user)->replay.rset_enabled = false; + st->counter++; st->counter_done++; +#ifdef NETDATA_INTERNAL_CHECKS + st->replay.start_streaming = false; + st->replay.after = 0; + st->replay.before = 0; +#endif + if (start_streaming) { if (st->update_every != update_every_child) rrdset_set_update_every(st, update_every_child); rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); + rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS); rrdset_flag_clear(st, RRDSET_FLAG_SYNC_CLOCK); + + worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, 100.0); + return PARSER_RC_OK; } @@ -1191,7 +1288,7 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi }; // fp_plugin_output = our input; fp_plugin_input = our output - PARSER *parser = parser_init(host, &user, fp_plugin_output, fp_plugin_input, PARSER_INPUT_SPLIT, NULL); + PARSER *parser = parser_init(host, &user, fp_plugin_output, fp_plugin_input, -1, PARSER_INPUT_SPLIT, NULL); rrd_collector_started(); diff --git a/collectors/plugins.d/pluginsd_parser.h b/collectors/plugins.d/pluginsd_parser.h index a76ab211fe..e18b43e580 100644 --- a/collectors/plugins.d/pluginsd_parser.h +++ b/collectors/plugins.d/pluginsd_parser.h @@ -26,6 +26,10 @@ typedef struct parser_user_object { usec_t start_time_ut; usec_t end_time_ut; + + time_t wall_clock_time; + + bool rset_enabled; } replay; } PARSER_USER_OBJECT; |