summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-11-03 12:13:48 +0200
committerGitHub <noreply@github.com>2022-11-03 12:13:48 +0200
commita19795e85fd1d026171661c7f97bde8f9f7d0b1a (patch)
tree080a060e19fcb3a248514cff01e5749b8fad895c /streaming
parentcd28c686158840afb020d6b984aa9f96ac656742 (diff)
do not resend charts upstream when chart variables are being updated (#13946)
* do not resend charts upstream when chart variables are being updated * re-stream archived hosts that are now being collected
Diffstat (limited to 'streaming')
-rw-r--r--streaming/rrdpush.c32
-rw-r--r--streaming/rrdpush.h2
-rw-r--r--streaming/sender.c15
3 files changed, 16 insertions, 33 deletions
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index 31471fb4dc..b015985e6a 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -161,16 +161,7 @@ int rrdpush_init() {
// this is for the first iterations of each chart
unsigned int remote_clock_resync_iterations = 60;
-static inline bool should_send_chart_matching(RRDSET *st) {
- // get all the flags we need to check, with one atomic operation
- RRDSET_FLAGS flags = rrdset_flag_check(st,
- RRDSET_FLAG_UPSTREAM_SEND
- | RRDSET_FLAG_UPSTREAM_IGNORE
- | RRDSET_FLAG_ANOMALY_RATE_CHART
- | RRDSET_FLAG_ANOMALY_DETECTION
- | RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED
- );
-
+static inline bool should_send_chart_matching(RRDSET *st, RRDSET_FLAGS flags) {
if(!(flags & RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED))
return false;
@@ -220,8 +211,6 @@ int configured_as_parent() {
return is_parent;
}
-#define need_to_send_chart_definition(st) (!rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_EXPOSED))
-
// chart labels
static int send_clabels_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) {
BUFFER *wb = (BUFFER *)data;
@@ -334,7 +323,7 @@ static inline void rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) {
}
// sends the current chart dimensions
-void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_state *s) {
+static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_state *s, RRDSET_FLAGS flags) {
buffer_fast_strcat(wb, "BEGIN \"", 7);
buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id));
buffer_fast_strcat(wb, "\" ", 2);
@@ -365,6 +354,10 @@ void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_state *s)
}
}
rrddim_foreach_done(rd);
+
+ if(unlikely(flags & RRDSET_FLAG_UPSTREAM_SEND_VARIABLES))
+ rrdsetvar_print_to_streaming_custom_chart_variables(st, wb);
+
buffer_fast_strcat(wb, "END\n", 4);
}
@@ -374,7 +367,8 @@ static void rrdpush_sender_thread_spawn(RRDHOST *host);
bool rrdset_push_chart_definition_now(RRDSET *st) {
RRDHOST *host = st->rrdhost;
- if(unlikely(!rrdhost_can_send_definitions_to_parent(host) || !should_send_chart_matching(st)))
+ if(unlikely(!rrdhost_can_send_definitions_to_parent(host)
+ || !should_send_chart_matching(st, __atomic_load_n(&st->flags, __ATOMIC_SEQ_CST))))
return false;
BUFFER *wb = sender_start(host->sender);
@@ -412,16 +406,18 @@ void rrdset_done_push(RRDSET *st) {
rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS);
}
- if(unlikely(!should_send_chart_matching(st)))
+ RRDSET_FLAGS rrdset_flags = __atomic_load_n(&st->flags, __ATOMIC_SEQ_CST);
+
+ if(unlikely(!should_send_chart_matching(st, rrdset_flags)))
return;
BUFFER *wb = sender_start(host->sender);
- if(unlikely(need_to_send_chart_definition(st)))
+ if(unlikely(!(rrdset_flags & RRDSET_FLAG_UPSTREAM_EXPOSED)))
rrdpush_send_chart_definition(wb, st);
- if (rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED))
- rrdpush_send_chart_metrics(wb, st, host->sender);
+ if (likely(rrdset_flags & RRDSET_FLAG_SENDER_REPLICATION_FINISHED))
+ rrdpush_send_chart_metrics(wb, st, host->sender, rrdset_flags);
sender_commit(host->sender, wb);
}
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index 4443a84755..819a94cd27 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -272,6 +272,4 @@ void log_sender_capabilities(struct sender_state *s);
STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version);
int32_t stream_capabilities_to_vn(uint32_t caps);
-void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_state *s);
-
#endif //NETDATA_RRDPUSH_H
diff --git a/streaming/sender.c b/streaming/sender.c
index 20d57b7135..8579f81475 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -1254,11 +1254,8 @@ void *rrdpush_sender_thread(void *ptr) {
rrdpush_claimed_id(s->host);
rrdpush_send_host_labels(s->host);
- // TO PUSH METRICS WITH DEFINITIONS:
- //if(unlikely(s->rrdpush_sender_socket != -1 && __atomic_load_n(&s->host->rrdpush_sender_connected, __ATOMIC_SEQ_CST))) {
- // thread_data->sending_definitions_status = SENDING_DEFINITIONS_DONE;
- // rrdhost_flag_set(s->host, RRDHOST_FLAG_STREAM_COLLECTED_METRICS);
- //}
+ rrdhost_flag_set(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
+ info("STREAM %s [send to %s]: enabling metrics streaming...", rrdhost_hostname(s->host), s->connected_to);
continue;
}
@@ -1280,14 +1277,6 @@ void *rrdpush_sender_thread(void *ptr) {
if(outstanding)
s->send_attempts++;
- else {
- if(unlikely(rrdhost_flag_check(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED) &&
- !rrdhost_flag_check(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS))) {
- // let the data collection threads know we are ready to push metrics
- rrdhost_flag_set(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
- info("STREAM %s [send to %s]: enabling metrics streaming...", rrdhost_hostname(s->host), s->connected_to);
- }
- }
if(unlikely(s->rrdpush_sender_pipe[PIPE_READ] == -1)) {
if(!rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true)) {