summaryrefslogtreecommitdiffstats
path: root/collectors
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-11-20 23:47:53 +0200
committerGitHub <noreply@github.com>2022-11-20 23:47:53 +0200
commit284f6f3aa4f36cefad2601c490510621496c2b53 (patch)
tree97a7d55627ef7477f431c53a20d0e6f1f738a419 /collectors
parent2d02484954f68bf7e3015cb649e2f10a9f3c5c95 (diff)
streaming compression, query planner and replication fixes (#14023)
* streaming compression, query planner and replication fixes * remove journal v2 stats from global statistics * disable sql for checking past sql UUIDs * single threaded replication * final replication thread using dictionaries and JudyL for sorting the pending requests * do not timeout the sending socket when there are pending replication requests * streaming receiver using read() instead of fread() * remove FILE * from streaming - now using posix read() and write() * increase timeouts to 10 minutes * apply sender timeout only when there are metrics that are supposed to be streamed * error handling in replication * remove retries on socket read timeout; better error messages * take into account inbound traffic too to detect that a connection is stale * remove race conditions from replication thread * make sure deleted entries are marked as executed, so that even if deletion fails, they will not be executed * 2 minutes timeout to retry streaming to a parent that already has this node * remove unecessary condition check * fix compilation warnings * include judy in replication * wrappers to handle retries for SSL_read and SSL_write * compressed bytes read monitoring * recursive locks on replication to make it faster during flush or cleanup * replication completion chart at the receiver side * simplified recursive mutex * simplified recursive mutex again
Diffstat (limited to 'collectors')
-rw-r--r--collectors/plugins.d/pluginsd_parser.c189
-rw-r--r--collectors/plugins.d/pluginsd_parser.h4
2 files changed, 147 insertions, 46 deletions
diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c
index 3e0f83422c..97d2dca984 100644
--- a/collectors/plugins.d/pluginsd_parser.c
+++ b/collectors/plugins.d/pluginsd_parser.c
@@ -13,20 +13,44 @@ static int send_to_plugin(const char *txt, void *data) {
#ifdef ENABLE_HTTPS
struct netdata_ssl *ssl = parser->ssl_output;
if(ssl) {
- if(ssl->conn && ssl->flags == NETDATA_SSL_HANDSHAKE_COMPLETE) {
- size_t size = strlen(txt);
- return SSL_write(ssl->conn, txt, (int)size);
- }
+ if(ssl->conn && ssl->flags == NETDATA_SSL_HANDSHAKE_COMPLETE)
+ return (int)netdata_ssl_write(ssl->conn, (void *)txt, strlen(txt));
- error("cannot write to SSL connection - connection is not ready.");
+ error("PLUGINSD: cannot send command (SSL)");
return -1;
}
#endif
- FILE *fp = parser->output;
- int ret = fprintf(fp, "%s", txt);
- fflush(fp);
- return ret;
+ if(parser->fp_output) {
+ int bytes = fprintf(parser->fp_output, "%s", txt);
+ if(bytes <= 0) {
+ error("PLUGINSD: cannot send command (FILE)");
+ return -2;
+ }
+ fflush(parser->fp_output);
+ return bytes;
+ }
+
+ if(parser->fd != -1) {
+ size_t bytes = 0;
+ size_t total = strlen(txt);
+ ssize_t sent;
+
+ do {
+ sent = write(parser->fd, &txt[bytes], total - bytes);
+ if(sent <= 0) {
+ error("PLUGINSD: cannot send command (fd)");
+ return -3;
+ }
+ bytes += sent;
+ }
+ while(bytes < total);
+
+ return (int)bytes;
+ }
+
+ error("PLUGINSD: cannot send command (no output socket/pipe/file given to plugins.d parser)");
+ return -4;
}
PARSER_RC pluginsd_set(char **words, size_t num_words, void *user)
@@ -293,9 +317,25 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us
// rrdhost_hostname(host), rrdset_id(st),
// (unsigned long long)first_entry_child, (unsigned long long)last_entry_child);
- rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
+ bool ok = true;
+ if(!rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) {
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ st->replay.start_streaming = false;
+ st->replay.after = 0;
+ st->replay.before = 0;
+#endif
+
+ rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
+ rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS);
+
+ ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st, first_entry_child,
+ last_entry_child, 0, 0);
+ }
+ else {
+ internal_error(true, "RRDSET: not sending duplicate replication request for chart '%s'", rrdset_id(st));
+ }
- bool ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st, first_entry_child, last_entry_child, 0, 0);
return ok ? PARSER_RC_OK : PARSER_RC_ERROR;
}
@@ -425,7 +465,7 @@ static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void
pf->sent_ut = now_realtime_usec();
if(ret < 0) {
- error("FUNCTION: failed to send function to plugin, fprintf() returned error %d", ret);
+ error("FUNCTION: failed to send function to plugin, error %d", ret);
rrd_call_function_error(pf->destination_wb, "Failed to communicate with collector", HTTP_RESP_BACKEND_FETCH_FAILED);
}
else {
@@ -847,42 +887,54 @@ PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *use
char *id = get_word(words, num_words, 1);
char *start_time_str = get_word(words, num_words, 2);
char *end_time_str = get_word(words, num_words, 3);
+ char *child_now_str = get_word(words, num_words, 4);
RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host;
if (unlikely(!id || (!st && !*id))) {
- error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_BEGIN " without a chart id for host '%s'. Disabling it.", rrdhost_hostname(host));
+ error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " without a chart id for host '%s'. Disabling it.", rrdhost_hostname(host));
goto disable;
}
if(*id) {
st = rrdset_find(host, id);
if (unlikely(!st)) {
- error("requested a " PLUGINSD_KEYWORD_REPLAY_BEGIN " on chart '%s', which does not exist on host '%s'. Disabling it.",
+ error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " on chart '%s', which does not exist on host '%s'. Disabling it.",
id, rrdhost_hostname(host));
goto disable;
}
((PARSER_USER_OBJECT *) user)->st = st;
- ((PARSER_USER_OBJECT *) user)->replay.start_time = 0;
- ((PARSER_USER_OBJECT *) user)->replay.end_time = 0;
- ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0;
- ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0;
+ }
+
+ if(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE) && !rrdset_flag_check(st, RRDSET_FLAG_ARCHIVED)) {
+ error("REPLAY: chart '%s' on host '%s' has the OBSOLETE flag set, but it is collected.", rrdset_id(st), rrdhost_hostname(host));
+ rrdset_isnot_obsolete(st);
}
if(start_time_str && end_time_str) {
time_t start_time = strtol(start_time_str, NULL, 0);
time_t end_time = strtol(end_time_str, NULL, 0);
- if(start_time && end_time) {
- if (start_time > end_time) {
- error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_BEGIN " on chart '%s' ('%s') on host '%s', but timings are invalid (%ld to %ld). Disabling it.",
- rrdset_name(st), rrdset_id(st), rrdhost_hostname(st->rrdhost), start_time, end_time);
- goto disable;
- }
+ time_t wall_clock_time = 0, tolerance;
+ if(child_now_str) {
+ wall_clock_time = strtol(child_now_str, NULL, 0);
+ tolerance = 1;
+ }
+
+ if(wall_clock_time <= 0) {
+ wall_clock_time = now_realtime_sec();
+ tolerance = st->update_every + 60;
+ }
+
+ internal_error(
+ (!st->replay.start_streaming && (end_time < st->replay.after || start_time > st->replay.before)),
+ "REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " on chart '%s' ('%s') on host '%s', from %ld to %ld, which does not match our request (%ld to %ld).",
+ rrdset_name(st), rrdset_id(st), rrdhost_hostname(st->rrdhost), start_time, end_time, st->replay.after, st->replay.before);
- if (end_time - start_time != st->update_every)
+ if(start_time && end_time && start_time < wall_clock_time + tolerance && end_time < wall_clock_time + tolerance && start_time < end_time) {
+ if (unlikely(end_time - start_time != st->update_every))
rrdset_set_update_every(st, end_time - start_time);
st->last_collected_time.tv_sec = end_time;
@@ -891,11 +943,6 @@ PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *use
st->last_updated.tv_sec = end_time;
st->last_updated.tv_usec = 0;
- ((PARSER_USER_OBJECT *) user)->replay.start_time = start_time;
- ((PARSER_USER_OBJECT *) user)->replay.end_time = end_time;
- ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = (usec_t) start_time * USEC_PER_SEC;
- ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = (usec_t) end_time * USEC_PER_SEC;
-
st->counter++;
st->counter_done++;
@@ -903,9 +950,31 @@ PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *use
st->current_entry++;
if(st->current_entry >= st->entries)
st->current_entry -= st->entries;
+
+ ((PARSER_USER_OBJECT *) user)->replay.start_time = start_time;
+ ((PARSER_USER_OBJECT *) user)->replay.end_time = end_time;
+ ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = (usec_t) start_time * USEC_PER_SEC;
+ ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = (usec_t) end_time * USEC_PER_SEC;
+ ((PARSER_USER_OBJECT *) user)->replay.wall_clock_time = wall_clock_time;
+ ((PARSER_USER_OBJECT *) user)->replay.rset_enabled = true;
+
+ return PARSER_RC_OK;
}
+
+ internal_error(true,
+ "REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " on chart '%s' ('%s') on host '%s', from %ld to %ld, but timestamps are invalid (now is %ld).",
+ rrdset_name(st), rrdset_id(st), rrdhost_hostname(st->rrdhost), start_time, end_time, wall_clock_time);
}
+ // the child sends an RBEGIN without any parameters initially
+ // setting rset_enabled to false, means the RSET should not store any metrics
+ // to store metrics, the RBEGIN needs to have timestamps
+ ((PARSER_USER_OBJECT *) user)->replay.start_time = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.end_time = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.wall_clock_time = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.rset_enabled = false;
return PARSER_RC_OK;
disable:
@@ -915,6 +984,9 @@ disable:
PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user)
{
+ if(!((PARSER_USER_OBJECT *) user)->replay.rset_enabled)
+ return PARSER_RC_OK;
+
char *dimension = get_word(words, num_words, 1);
char *value_str = get_word(words, num_words, 2);
char *flags_str = get_word(words, num_words, 3);
@@ -923,20 +995,22 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user)
RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
if (unlikely(!st)) {
- error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_SET " on dimension '%s' on host '%s', without a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.",
+ error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_SET " on dimension '%s' on host '%s', without a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.",
dimension, rrdhost_hostname(host));
goto disable;
}
if (unlikely(!dimension || !*dimension)) {
- error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_SET " on chart '%s' of host '%s', without a dimension. Disabling it.",
+ error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_SET " on chart '%s' of host '%s', without a dimension. Disabling it.",
rrdset_id(st), rrdhost_hostname(host));
goto disable;
}
if (unlikely(!((PARSER_USER_OBJECT *) user)->replay.start_time || !((PARSER_USER_OBJECT *) user)->replay.end_time)) {
- error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_SET " on dimension '%s' on host '%s', without timings from a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.",
- dimension, rrdhost_hostname(host));
+ error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_SET " on dimension '%s' on host '%s', with invalid timestamps %ld to %ld from a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.",
+ dimension, rrdhost_hostname(host),
+ ((PARSER_USER_OBJECT *) user)->replay.start_time,
+ ((PARSER_USER_OBJECT *) user)->replay.end_time);
goto disable;
}
@@ -946,14 +1020,11 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user)
if(unlikely(!flags_str))
flags_str = "";
- if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
- debug(D_PLUGINSD, "REPLAY: is replaying dimension '%s'/'%s' to '%s'", rrdset_id(st), dimension, value_str);
-
if (likely(value_str)) {
RRDDIM_ACQUIRED *rda = rrddim_find_and_acquire(st, dimension);
RRDDIM *rd = rrddim_acquired_to_rrddim(rda);
if(unlikely(!rd)) {
- error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_SET " to dimension with id '%s' on chart '%s' ('%s') on host '%s', which does not exist. Disabling it.",
+ error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_SET " to dimension '%s' on chart '%s' ('%s') on host '%s', which does not exist. Disabling it.",
dimension, rrdset_name(st), rrdset_id(st), rrdhost_hostname(st->rrdhost));
goto disable;
}
@@ -961,7 +1032,7 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user)
RRDDIM_FLAGS rd_flags = rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE | RRDDIM_FLAG_ARCHIVED);
if(unlikely(rd_flags & RRDDIM_FLAG_OBSOLETE)) {
- error("Dimension %s in chart '%s' has the OBSOLETE flag set, but it is collected.", rrddim_name(rd), rrdset_id(st));
+ error("REPLAY: dimension '%s' in chart '%s' has the OBSOLETE flag set, but it is collected.", rrddim_name(rd), rrdset_id(st));
rrddim_isnot_obsolete(st, rd);
}
@@ -998,7 +1069,7 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user)
rd->collections_counter++;
}
else
- error("Dimension %s in chart '%s' has the ARCHIVED flag set, but it is collected. Ignoring data.", rrddim_name(rd), rrdset_id(st));
+ error("REPLAY: dimension '%s' in chart '%s' has the ARCHIVED flag set, but it is collected. Ignoring data.", rrddim_name(rd), rrdset_id(st));
rrddim_acquired_release(rda);
}
@@ -1021,13 +1092,13 @@ PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words
RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
if (unlikely(!st)) {
- error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " on dimension '%s' on host '%s', without a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.",
+ error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " on dimension '%s' on host '%s', without a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.",
dimension, rrdhost_hostname(host));
goto disable;
}
if (unlikely(!dimension || !*dimension)) {
- error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " on chart '%s' of host '%s', without a dimension. Disabling it.",
+ error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " on chart '%s' of host '%s', without a dimension. Disabling it.",
rrdset_id(st), rrdhost_hostname(host));
goto disable;
}
@@ -1035,7 +1106,7 @@ PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words
RRDDIM_ACQUIRED *rda = rrddim_find_and_acquire(st, dimension);
RRDDIM *rd = rrddim_acquired_to_rrddim(rda);
if(unlikely(!rd)) {
- error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " to dimension with id '%s' on chart '%s' ('%s') on host '%s', which does not exist. Disabling it.",
+ error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " to dimension with id '%s' on chart '%s' ('%s') on host '%s', which does not exist. Disabling it.",
dimension, rrdset_name(st), rrdset_id(st), rrdhost_hostname(st->rrdhost));
goto disable;
}
@@ -1067,7 +1138,7 @@ PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words
RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
if (unlikely(!st)) {
- error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " on host '%s', without a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.",
+ error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " on host '%s', without a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.",
rrdhost_hostname(host));
goto disable;
}
@@ -1117,7 +1188,7 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
if (unlikely(!st)) {
- error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_END " on host '%s', without a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.",
+ error("REPLAY: got a " PLUGINSD_KEYWORD_REPLAY_END " on host '%s', without a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.",
rrdhost_hostname(host));
return PARSER_RC_ERROR;
}
@@ -1132,15 +1203,41 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
((PARSER_USER_OBJECT *) user)->st = NULL;
((PARSER_USER_OBJECT *) user)->count++;
+ if(((PARSER_USER_OBJECT *) user)->replay.rset_enabled && st->rrdhost->receiver) {
+ time_t now = now_realtime_sec();
+ time_t started = st->rrdhost->receiver->replication_first_time_t;
+ time_t current = ((PARSER_USER_OBJECT *) user)->replay.end_time;
+
+ worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION,
+ (NETDATA_DOUBLE)(current - started) * 100.0 / (NETDATA_DOUBLE)(now - started));
+ }
+
+ ((PARSER_USER_OBJECT *) user)->replay.start_time = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.end_time = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.wall_clock_time = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.rset_enabled = false;
+
st->counter++;
st->counter_done++;
+#ifdef NETDATA_INTERNAL_CHECKS
+ st->replay.start_streaming = false;
+ st->replay.after = 0;
+ st->replay.before = 0;
+#endif
+
if (start_streaming) {
if (st->update_every != update_every_child)
rrdset_set_update_every(st, update_every_child);
rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
+ rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS);
rrdset_flag_clear(st, RRDSET_FLAG_SYNC_CLOCK);
+
+ worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, 100.0);
+
return PARSER_RC_OK;
}
@@ -1191,7 +1288,7 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi
};
// fp_plugin_output = our input; fp_plugin_input = our output
- PARSER *parser = parser_init(host, &user, fp_plugin_output, fp_plugin_input, PARSER_INPUT_SPLIT, NULL);
+ PARSER *parser = parser_init(host, &user, fp_plugin_output, fp_plugin_input, -1, PARSER_INPUT_SPLIT, NULL);
rrd_collector_started();
diff --git a/collectors/plugins.d/pluginsd_parser.h b/collectors/plugins.d/pluginsd_parser.h
index a76ab211fe..e18b43e580 100644
--- a/collectors/plugins.d/pluginsd_parser.h
+++ b/collectors/plugins.d/pluginsd_parser.h
@@ -26,6 +26,10 @@ typedef struct parser_user_object {
usec_t start_time_ut;
usec_t end_time_ut;
+
+ time_t wall_clock_time;
+
+ bool rset_enabled;
} replay;
} PARSER_USER_OBJECT;