diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2022-11-03 12:13:48 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-11-03 12:13:48 +0200 |
commit | a19795e85fd1d026171661c7f97bde8f9f7d0b1a (patch) | |
tree | 080a060e19fcb3a248514cff01e5749b8fad895c | |
parent | cd28c686158840afb020d6b984aa9f96ac656742 (diff) |
do not resend charts upstream when chart variables are being updated (#13946)
* do not resend charts upstream when chart variables are being updated
* re-stream archived hosts that are now being collected
-rw-r--r-- | collectors/plugins.d/pluginsd_parser.c | 21 | ||||
-rw-r--r-- | database/rrd.h | 4 | ||||
-rw-r--r-- | database/rrdhost.c | 1 | ||||
-rw-r--r-- | database/rrdsetvar.c | 6 | ||||
-rw-r--r-- | streaming/rrdpush.c | 32 | ||||
-rw-r--r-- | streaming/rrdpush.h | 2 | ||||
-rw-r--r-- | streaming/sender.c | 15 |
7 files changed, 41 insertions, 40 deletions
diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c index 732120c316..1ddc249db9 100644 --- a/collectors/plugins.d/pluginsd_parser.c +++ b/collectors/plugins.d/pluginsd_parser.c @@ -287,6 +287,12 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us "REPLAY: received " PLUGINSD_KEYWORD_CHART_DEFINITION_END " with malformed timings (first time %llu, last time %llu).", (unsigned long long)first_entry_child, (unsigned long long)last_entry_child); +// internal_error( +// true, +// "REPLAY host '%s', chart '%s': received " PLUGINSD_KEYWORD_CHART_DEFINITION_END " first time %llu, last time %llu.", +// rrdhost_hostname(host), rrdset_id(st), +// (unsigned long long)first_entry_child, (unsigned long long)last_entry_child); + rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); bool ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st, first_entry_child, last_entry_child, 0, 0); @@ -1098,12 +1104,12 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user) } time_t update_every_child = str2l(get_word(words, num_words, 1)); - time_t first_entry_child = str2l(get_word(words, num_words, 2)); - time_t last_entry_child = str2l(get_word(words, num_words, 3)); + time_t first_entry_child = (time_t)str2ull(get_word(words, num_words, 2)); + time_t last_entry_child = (time_t)str2ull(get_word(words, num_words, 3)); bool start_streaming = (strcmp(get_word(words, num_words, 4), "true") == 0); - time_t first_entry_requested = str2l(get_word(words, num_words, 5)); - time_t last_entry_requested = str2l(get_word(words, num_words, 6)); + time_t first_entry_requested = (time_t)str2ull(get_word(words, num_words, 5)); + time_t last_entry_requested = (time_t)str2ull(get_word(words, num_words, 6)); PARSER_USER_OBJECT *user_object = user; @@ -1116,6 +1122,13 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user) return PARSER_RC_ERROR; } +// internal_error(true, +// "REPLAY: host '%s', chart '%s': received " PLUGINSD_KEYWORD_REPLAY_END " child first_t = %llu, last_t = %llu, start_streaming = %s, requested first_t = %llu, last_t = %llu", +// rrdhost_hostname(host), rrdset_id(st), +// (unsigned long long)first_entry_child, (unsigned long long)last_entry_child, +// start_streaming?"true":"false", +// (unsigned long long)first_entry_requested, (unsigned long long)last_entry_requested); + ((PARSER_USER_OBJECT *) user)->st = NULL; ((PARSER_USER_OBJECT *) user)->count++; diff --git a/database/rrd.h b/database/rrd.h index 3bdf3515e4..b57df223b1 100644 --- a/database/rrd.h +++ b/database/rrd.h @@ -510,9 +510,11 @@ typedef enum rrdset_flags { RRDSET_FLAG_OBSOLETE = (1 << 3), // this is marked by the collector/module as obsolete RRDSET_FLAG_EXPORTING_SEND = (1 << 4), // if set, this chart should be sent to Prometheus web API and external databases RRDSET_FLAG_EXPORTING_IGNORE = (1 << 5), // if set, this chart should not be sent to Prometheus web API and external databases + RRDSET_FLAG_UPSTREAM_SEND = (1 << 6), // if set, this chart should be sent upstream (streaming) RRDSET_FLAG_UPSTREAM_IGNORE = (1 << 7), // if set, this chart should not be sent upstream (streaming) RRDSET_FLAG_UPSTREAM_EXPOSED = (1 << 8), // if set, we have sent this chart definition to netdata parent (streaming) + RRDSET_FLAG_STORE_FIRST = (1 << 9), // if set, do not eliminate the first collection during interpolation RRDSET_FLAG_HETEROGENEOUS = (1 << 10), // if set, the chart is not homogeneous (dimensions in it have multiple algorithms, multipliers or dividers) RRDSET_FLAG_HOMOGENEOUS_CHECK = (1 << 11), // if set, the chart should be checked to determine if the dimensions are homogeneous @@ -532,6 +534,8 @@ typedef enum rrdset_flags { RRDSET_FLAG_SENDER_REPLICATION_FINISHED = (1 << 23), // the sending side has completed replication RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED = (1 << 24), // the receiving side has completed replication + + RRDSET_FLAG_UPSTREAM_SEND_VARIABLES = (1 << 25), // a custom variable has been updated and needs to be exposed to parent } RRDSET_FLAGS; #define rrdset_flag_check(st, flag) (__atomic_load_n(&((st)->flags), __ATOMIC_SEQ_CST) & (flag)) diff --git a/database/rrdhost.c b/database/rrdhost.c index 9a368385ca..08a9d42e42 100644 --- a/database/rrdhost.c +++ b/database/rrdhost.c @@ -1044,6 +1044,7 @@ void stop_streaming_sender(RRDHOST *host) dictionary_destroy(host->sender->replication_requests); freez(host->sender); host->sender = NULL; + rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_INITIALIZED); } void stop_streaming_receiver(RRDHOST *host) diff --git a/database/rrdsetvar.c b/database/rrdsetvar.c index b1800fc7a6..22cf8a1f01 100644 --- a/database/rrdsetvar.c +++ b/database/rrdsetvar.c @@ -268,14 +268,14 @@ void rrdsetvar_custom_chart_variable_set(RRDSET *st, const RRDSETVAR_ACQUIRED *r NETDATA_DOUBLE *v = rs->value; if(*v != value) { *v = value; - - // mark the chart to be sent upstream - rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED); + rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_SEND_VARIABLES); } } } void rrdsetvar_print_to_streaming_custom_chart_variables(RRDSET *st, BUFFER *wb) { + rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_SEND_VARIABLES); + // send the chart local custom variables RRDSETVAR *rs; dfe_start_read(st->rrdsetvar_root_index, rs) { diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index 31471fb4dc..b015985e6a 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -161,16 +161,7 @@ int rrdpush_init() { // this is for the first iterations of each chart unsigned int remote_clock_resync_iterations = 60; -static inline bool should_send_chart_matching(RRDSET *st) { - // get all the flags we need to check, with one atomic operation - RRDSET_FLAGS flags = rrdset_flag_check(st, - RRDSET_FLAG_UPSTREAM_SEND - | RRDSET_FLAG_UPSTREAM_IGNORE - | RRDSET_FLAG_ANOMALY_RATE_CHART - | RRDSET_FLAG_ANOMALY_DETECTION - | RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED - ); - +static inline bool should_send_chart_matching(RRDSET *st, RRDSET_FLAGS flags) { if(!(flags & RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED)) return false; @@ -220,8 +211,6 @@ int configured_as_parent() { return is_parent; } -#define need_to_send_chart_definition(st) (!rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_EXPOSED)) - // chart labels static int send_clabels_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) { BUFFER *wb = (BUFFER *)data; @@ -334,7 +323,7 @@ static inline void rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) { } // sends the current chart dimensions -void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_state *s) { +static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_state *s, RRDSET_FLAGS flags) { buffer_fast_strcat(wb, "BEGIN \"", 7); buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id)); buffer_fast_strcat(wb, "\" ", 2); @@ -365,6 +354,10 @@ void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_state *s) } } rrddim_foreach_done(rd); + + if(unlikely(flags & RRDSET_FLAG_UPSTREAM_SEND_VARIABLES)) + rrdsetvar_print_to_streaming_custom_chart_variables(st, wb); + buffer_fast_strcat(wb, "END\n", 4); } @@ -374,7 +367,8 @@ static void rrdpush_sender_thread_spawn(RRDHOST *host); bool rrdset_push_chart_definition_now(RRDSET *st) { RRDHOST *host = st->rrdhost; - if(unlikely(!rrdhost_can_send_definitions_to_parent(host) || !should_send_chart_matching(st))) + if(unlikely(!rrdhost_can_send_definitions_to_parent(host) + || !should_send_chart_matching(st, __atomic_load_n(&st->flags, __ATOMIC_SEQ_CST)))) return false; BUFFER *wb = sender_start(host->sender); @@ -412,16 +406,18 @@ void rrdset_done_push(RRDSET *st) { rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS); } - if(unlikely(!should_send_chart_matching(st))) + RRDSET_FLAGS rrdset_flags = __atomic_load_n(&st->flags, __ATOMIC_SEQ_CST); + + if(unlikely(!should_send_chart_matching(st, rrdset_flags))) return; BUFFER *wb = sender_start(host->sender); - if(unlikely(need_to_send_chart_definition(st))) + if(unlikely(!(rrdset_flags & RRDSET_FLAG_UPSTREAM_EXPOSED))) rrdpush_send_chart_definition(wb, st); - if (rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED)) - rrdpush_send_chart_metrics(wb, st, host->sender); + if (likely(rrdset_flags & RRDSET_FLAG_SENDER_REPLICATION_FINISHED)) + rrdpush_send_chart_metrics(wb, st, host->sender, rrdset_flags); sender_commit(host->sender, wb); } diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h index 4443a84755..819a94cd27 100644 --- a/streaming/rrdpush.h +++ b/streaming/rrdpush.h @@ -272,6 +272,4 @@ void log_sender_capabilities(struct sender_state *s); STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version); int32_t stream_capabilities_to_vn(uint32_t caps); -void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_state *s); - #endif //NETDATA_RRDPUSH_H diff --git a/streaming/sender.c b/streaming/sender.c index 20d57b7135..8579f81475 100644 --- a/streaming/sender.c +++ b/streaming/sender.c @@ -1254,11 +1254,8 @@ void *rrdpush_sender_thread(void *ptr) { rrdpush_claimed_id(s->host); rrdpush_send_host_labels(s->host); - // TO PUSH METRICS WITH DEFINITIONS: - //if(unlikely(s->rrdpush_sender_socket != -1 && __atomic_load_n(&s->host->rrdpush_sender_connected, __ATOMIC_SEQ_CST))) { - // thread_data->sending_definitions_status = SENDING_DEFINITIONS_DONE; - // rrdhost_flag_set(s->host, RRDHOST_FLAG_STREAM_COLLECTED_METRICS); - //} + rrdhost_flag_set(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS); + info("STREAM %s [send to %s]: enabling metrics streaming...", rrdhost_hostname(s->host), s->connected_to); continue; } @@ -1280,14 +1277,6 @@ void *rrdpush_sender_thread(void *ptr) { if(outstanding) s->send_attempts++; - else { - if(unlikely(rrdhost_flag_check(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED) && - !rrdhost_flag_check(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS))) { - // let the data collection threads know we are ready to push metrics - rrdhost_flag_set(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS); - info("STREAM %s [send to %s]: enabling metrics streaming...", rrdhost_hostname(s->host), s->connected_to); - } - } if(unlikely(s->rrdpush_sender_pipe[PIPE_READ] == -1)) { if(!rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true)) { |