diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2022-11-03 12:13:48 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-11-03 12:13:48 +0200 |
commit | a19795e85fd1d026171661c7f97bde8f9f7d0b1a (patch) | |
tree | 080a060e19fcb3a248514cff01e5749b8fad895c /streaming | |
parent | cd28c686158840afb020d6b984aa9f96ac656742 (diff) |
do not resend charts upstream when chart variables are being updated (#13946)
* do not resend charts upstream when chart variables are being updated
* re-stream archived hosts that are now being collected
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/rrdpush.c | 32 | ||||
-rw-r--r-- | streaming/rrdpush.h | 2 | ||||
-rw-r--r-- | streaming/sender.c | 15 |
3 files changed, 16 insertions, 33 deletions
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index 31471fb4dc..b015985e6a 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -161,16 +161,7 @@ int rrdpush_init() { // this is for the first iterations of each chart unsigned int remote_clock_resync_iterations = 60; -static inline bool should_send_chart_matching(RRDSET *st) { - // get all the flags we need to check, with one atomic operation - RRDSET_FLAGS flags = rrdset_flag_check(st, - RRDSET_FLAG_UPSTREAM_SEND - | RRDSET_FLAG_UPSTREAM_IGNORE - | RRDSET_FLAG_ANOMALY_RATE_CHART - | RRDSET_FLAG_ANOMALY_DETECTION - | RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED - ); - +static inline bool should_send_chart_matching(RRDSET *st, RRDSET_FLAGS flags) { if(!(flags & RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED)) return false; @@ -220,8 +211,6 @@ int configured_as_parent() { return is_parent; } -#define need_to_send_chart_definition(st) (!rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_EXPOSED)) - // chart labels static int send_clabels_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) { BUFFER *wb = (BUFFER *)data; @@ -334,7 +323,7 @@ static inline void rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) { } // sends the current chart dimensions -void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_state *s) { +static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_state *s, RRDSET_FLAGS flags) { buffer_fast_strcat(wb, "BEGIN \"", 7); buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id)); buffer_fast_strcat(wb, "\" ", 2); @@ -365,6 +354,10 @@ void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_state *s) } } rrddim_foreach_done(rd); + + if(unlikely(flags & RRDSET_FLAG_UPSTREAM_SEND_VARIABLES)) + rrdsetvar_print_to_streaming_custom_chart_variables(st, wb); + buffer_fast_strcat(wb, "END\n", 4); } @@ -374,7 +367,8 @@ static void rrdpush_sender_thread_spawn(RRDHOST *host); bool rrdset_push_chart_definition_now(RRDSET *st) { RRDHOST *host = st->rrdhost; - if(unlikely(!rrdhost_can_send_definitions_to_parent(host) || !should_send_chart_matching(st))) + if(unlikely(!rrdhost_can_send_definitions_to_parent(host) + || !should_send_chart_matching(st, __atomic_load_n(&st->flags, __ATOMIC_SEQ_CST)))) return false; BUFFER *wb = sender_start(host->sender); @@ -412,16 +406,18 @@ void rrdset_done_push(RRDSET *st) { rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS); } - if(unlikely(!should_send_chart_matching(st))) + RRDSET_FLAGS rrdset_flags = __atomic_load_n(&st->flags, __ATOMIC_SEQ_CST); + + if(unlikely(!should_send_chart_matching(st, rrdset_flags))) return; BUFFER *wb = sender_start(host->sender); - if(unlikely(need_to_send_chart_definition(st))) + if(unlikely(!(rrdset_flags & RRDSET_FLAG_UPSTREAM_EXPOSED))) rrdpush_send_chart_definition(wb, st); - if (rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED)) - rrdpush_send_chart_metrics(wb, st, host->sender); + if (likely(rrdset_flags & RRDSET_FLAG_SENDER_REPLICATION_FINISHED)) + rrdpush_send_chart_metrics(wb, st, host->sender, rrdset_flags); sender_commit(host->sender, wb); } diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h index 4443a84755..819a94cd27 100644 --- a/streaming/rrdpush.h +++ b/streaming/rrdpush.h @@ -272,6 +272,4 @@ void log_sender_capabilities(struct sender_state *s); STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version); int32_t stream_capabilities_to_vn(uint32_t caps); -void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_state *s); - #endif //NETDATA_RRDPUSH_H diff --git a/streaming/sender.c b/streaming/sender.c index 20d57b7135..8579f81475 100644 --- a/streaming/sender.c +++ b/streaming/sender.c @@ -1254,11 +1254,8 @@ void *rrdpush_sender_thread(void *ptr) { rrdpush_claimed_id(s->host); rrdpush_send_host_labels(s->host); - // TO PUSH METRICS WITH DEFINITIONS: - //if(unlikely(s->rrdpush_sender_socket != -1 && __atomic_load_n(&s->host->rrdpush_sender_connected, __ATOMIC_SEQ_CST))) { - // thread_data->sending_definitions_status = SENDING_DEFINITIONS_DONE; - // rrdhost_flag_set(s->host, RRDHOST_FLAG_STREAM_COLLECTED_METRICS); - //} + rrdhost_flag_set(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS); + info("STREAM %s [send to %s]: enabling metrics streaming...", rrdhost_hostname(s->host), s->connected_to); continue; } @@ -1280,14 +1277,6 @@ void *rrdpush_sender_thread(void *ptr) { if(outstanding) s->send_attempts++; - else { - if(unlikely(rrdhost_flag_check(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED) && - !rrdhost_flag_check(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS))) { - // let the data collection threads know we are ready to push metrics - rrdhost_flag_set(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS); - info("STREAM %s [send to %s]: enabling metrics streaming...", rrdhost_hostname(s->host), s->connected_to); - } - } if(unlikely(s->rrdpush_sender_pipe[PIPE_READ] == -1)) { if(!rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true)) { |