summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-11-03 12:13:48 +0200
committerGitHub <noreply@github.com>2022-11-03 12:13:48 +0200
commita19795e85fd1d026171661c7f97bde8f9f7d0b1a (patch)
tree080a060e19fcb3a248514cff01e5749b8fad895c
parentcd28c686158840afb020d6b984aa9f96ac656742 (diff)
do not resend charts upstream when chart variables are being updated (#13946)
* do not resend charts upstream when chart variables are being updated * re-stream archived hosts that are now being collected
-rw-r--r--collectors/plugins.d/pluginsd_parser.c21
-rw-r--r--database/rrd.h4
-rw-r--r--database/rrdhost.c1
-rw-r--r--database/rrdsetvar.c6
-rw-r--r--streaming/rrdpush.c32
-rw-r--r--streaming/rrdpush.h2
-rw-r--r--streaming/sender.c15
7 files changed, 41 insertions, 40 deletions
diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c
index 732120c316..1ddc249db9 100644
--- a/collectors/plugins.d/pluginsd_parser.c
+++ b/collectors/plugins.d/pluginsd_parser.c
@@ -287,6 +287,12 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us
"REPLAY: received " PLUGINSD_KEYWORD_CHART_DEFINITION_END " with malformed timings (first time %llu, last time %llu).",
(unsigned long long)first_entry_child, (unsigned long long)last_entry_child);
+// internal_error(
+// true,
+// "REPLAY host '%s', chart '%s': received " PLUGINSD_KEYWORD_CHART_DEFINITION_END " first time %llu, last time %llu.",
+// rrdhost_hostname(host), rrdset_id(st),
+// (unsigned long long)first_entry_child, (unsigned long long)last_entry_child);
+
rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
bool ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st, first_entry_child, last_entry_child, 0, 0);
@@ -1098,12 +1104,12 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
}
time_t update_every_child = str2l(get_word(words, num_words, 1));
- time_t first_entry_child = str2l(get_word(words, num_words, 2));
- time_t last_entry_child = str2l(get_word(words, num_words, 3));
+ time_t first_entry_child = (time_t)str2ull(get_word(words, num_words, 2));
+ time_t last_entry_child = (time_t)str2ull(get_word(words, num_words, 3));
bool start_streaming = (strcmp(get_word(words, num_words, 4), "true") == 0);
- time_t first_entry_requested = str2l(get_word(words, num_words, 5));
- time_t last_entry_requested = str2l(get_word(words, num_words, 6));
+ time_t first_entry_requested = (time_t)str2ull(get_word(words, num_words, 5));
+ time_t last_entry_requested = (time_t)str2ull(get_word(words, num_words, 6));
PARSER_USER_OBJECT *user_object = user;
@@ -1116,6 +1122,13 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
return PARSER_RC_ERROR;
}
+// internal_error(true,
+// "REPLAY: host '%s', chart '%s': received " PLUGINSD_KEYWORD_REPLAY_END " child first_t = %llu, last_t = %llu, start_streaming = %s, requested first_t = %llu, last_t = %llu",
+// rrdhost_hostname(host), rrdset_id(st),
+// (unsigned long long)first_entry_child, (unsigned long long)last_entry_child,
+// start_streaming?"true":"false",
+// (unsigned long long)first_entry_requested, (unsigned long long)last_entry_requested);
+
((PARSER_USER_OBJECT *) user)->st = NULL;
((PARSER_USER_OBJECT *) user)->count++;
diff --git a/database/rrd.h b/database/rrd.h
index 3bdf3515e4..b57df223b1 100644
--- a/database/rrd.h
+++ b/database/rrd.h
@@ -510,9 +510,11 @@ typedef enum rrdset_flags {
RRDSET_FLAG_OBSOLETE = (1 << 3), // this is marked by the collector/module as obsolete
RRDSET_FLAG_EXPORTING_SEND = (1 << 4), // if set, this chart should be sent to Prometheus web API and external databases
RRDSET_FLAG_EXPORTING_IGNORE = (1 << 5), // if set, this chart should not be sent to Prometheus web API and external databases
+
RRDSET_FLAG_UPSTREAM_SEND = (1 << 6), // if set, this chart should be sent upstream (streaming)
RRDSET_FLAG_UPSTREAM_IGNORE = (1 << 7), // if set, this chart should not be sent upstream (streaming)
RRDSET_FLAG_UPSTREAM_EXPOSED = (1 << 8), // if set, we have sent this chart definition to netdata parent (streaming)
+
RRDSET_FLAG_STORE_FIRST = (1 << 9), // if set, do not eliminate the first collection during interpolation
RRDSET_FLAG_HETEROGENEOUS = (1 << 10), // if set, the chart is not homogeneous (dimensions in it have multiple algorithms, multipliers or dividers)
RRDSET_FLAG_HOMOGENEOUS_CHECK = (1 << 11), // if set, the chart should be checked to determine if the dimensions are homogeneous
@@ -532,6 +534,8 @@ typedef enum rrdset_flags {
RRDSET_FLAG_SENDER_REPLICATION_FINISHED = (1 << 23), // the sending side has completed replication
RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED = (1 << 24), // the receiving side has completed replication
+
+ RRDSET_FLAG_UPSTREAM_SEND_VARIABLES = (1 << 25), // a custom variable has been updated and needs to be exposed to parent
} RRDSET_FLAGS;
#define rrdset_flag_check(st, flag) (__atomic_load_n(&((st)->flags), __ATOMIC_SEQ_CST) & (flag))
diff --git a/database/rrdhost.c b/database/rrdhost.c
index 9a368385ca..08a9d42e42 100644
--- a/database/rrdhost.c
+++ b/database/rrdhost.c
@@ -1044,6 +1044,7 @@ void stop_streaming_sender(RRDHOST *host)
dictionary_destroy(host->sender->replication_requests);
freez(host->sender);
host->sender = NULL;
+ rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_INITIALIZED);
}
void stop_streaming_receiver(RRDHOST *host)
diff --git a/database/rrdsetvar.c b/database/rrdsetvar.c
index b1800fc7a6..22cf8a1f01 100644
--- a/database/rrdsetvar.c
+++ b/database/rrdsetvar.c
@@ -268,14 +268,14 @@ void rrdsetvar_custom_chart_variable_set(RRDSET *st, const RRDSETVAR_ACQUIRED *r
NETDATA_DOUBLE *v = rs->value;
if(*v != value) {
*v = value;
-
- // mark the chart to be sent upstream
- rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
+ rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_SEND_VARIABLES);
}
}
}
void rrdsetvar_print_to_streaming_custom_chart_variables(RRDSET *st, BUFFER *wb) {
+ rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_SEND_VARIABLES);
+
// send the chart local custom variables
RRDSETVAR *rs;
dfe_start_read(st->rrdsetvar_root_index, rs) {
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index 31471fb4dc..b015985e6a 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -161,16 +161,7 @@ int rrdpush_init() {
// this is for the first iterations of each chart
unsigned int remote_clock_resync_iterations = 60;
-static inline bool should_send_chart_matching(RRDSET *st) {
- // get all the flags we need to check, with one atomic operation
- RRDSET_FLAGS flags = rrdset_flag_check(st,
- RRDSET_FLAG_UPSTREAM_SEND
- | RRDSET_FLAG_UPSTREAM_IGNORE
- | RRDSET_FLAG_ANOMALY_RATE_CHART
- | RRDSET_FLAG_ANOMALY_DETECTION
- | RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED
- );
-
+static inline bool should_send_chart_matching(RRDSET *st, RRDSET_FLAGS flags) {
if(!(flags & RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED))
return false;
@@ -220,8 +211,6 @@ int configured_as_parent() {
return is_parent;
}
-#define need_to_send_chart_definition(st) (!rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_EXPOSED))
-
// chart labels
static int send_clabels_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) {
BUFFER *wb = (BUFFER *)data;
@@ -334,7 +323,7 @@ static inline void rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) {
}
// sends the current chart dimensions
-void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_state *s) {
+static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_state *s, RRDSET_FLAGS flags) {
buffer_fast_strcat(wb, "BEGIN \"", 7);
buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id));
buffer_fast_strcat(wb, "\" ", 2);
@@ -365,6 +354,10 @@ void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_state *s)
}
}
rrddim_foreach_done(rd);
+
+ if(unlikely(flags & RRDSET_FLAG_UPSTREAM_SEND_VARIABLES))
+ rrdsetvar_print_to_streaming_custom_chart_variables(st, wb);
+
buffer_fast_strcat(wb, "END\n", 4);
}
@@ -374,7 +367,8 @@ static void rrdpush_sender_thread_spawn(RRDHOST *host);
bool rrdset_push_chart_definition_now(RRDSET *st) {
RRDHOST *host = st->rrdhost;
- if(unlikely(!rrdhost_can_send_definitions_to_parent(host) || !should_send_chart_matching(st)))
+ if(unlikely(!rrdhost_can_send_definitions_to_parent(host)
+ || !should_send_chart_matching(st, __atomic_load_n(&st->flags, __ATOMIC_SEQ_CST))))
return false;
BUFFER *wb = sender_start(host->sender);
@@ -412,16 +406,18 @@ void rrdset_done_push(RRDSET *st) {
rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS);
}
- if(unlikely(!should_send_chart_matching(st)))
+ RRDSET_FLAGS rrdset_flags = __atomic_load_n(&st->flags, __ATOMIC_SEQ_CST);
+
+ if(unlikely(!should_send_chart_matching(st, rrdset_flags)))
return;
BUFFER *wb = sender_start(host->sender);
- if(unlikely(need_to_send_chart_definition(st)))
+ if(unlikely(!(rrdset_flags & RRDSET_FLAG_UPSTREAM_EXPOSED)))
rrdpush_send_chart_definition(wb, st);
- if (rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED))
- rrdpush_send_chart_metrics(wb, st, host->sender);
+ if (likely(rrdset_flags & RRDSET_FLAG_SENDER_REPLICATION_FINISHED))
+ rrdpush_send_chart_metrics(wb, st, host->sender, rrdset_flags);
sender_commit(host->sender, wb);
}
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index 4443a84755..819a94cd27 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -272,6 +272,4 @@ void log_sender_capabilities(struct sender_state *s);
STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version);
int32_t stream_capabilities_to_vn(uint32_t caps);
-void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_state *s);
-
#endif //NETDATA_RRDPUSH_H
diff --git a/streaming/sender.c b/streaming/sender.c
index 20d57b7135..8579f81475 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -1254,11 +1254,8 @@ void *rrdpush_sender_thread(void *ptr) {
rrdpush_claimed_id(s->host);
rrdpush_send_host_labels(s->host);
- // TO PUSH METRICS WITH DEFINITIONS:
- //if(unlikely(s->rrdpush_sender_socket != -1 && __atomic_load_n(&s->host->rrdpush_sender_connected, __ATOMIC_SEQ_CST))) {
- // thread_data->sending_definitions_status = SENDING_DEFINITIONS_DONE;
- // rrdhost_flag_set(s->host, RRDHOST_FLAG_STREAM_COLLECTED_METRICS);
- //}
+ rrdhost_flag_set(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
+ info("STREAM %s [send to %s]: enabling metrics streaming...", rrdhost_hostname(s->host), s->connected_to);
continue;
}
@@ -1280,14 +1277,6 @@ void *rrdpush_sender_thread(void *ptr) {
if(outstanding)
s->send_attempts++;
- else {
- if(unlikely(rrdhost_flag_check(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED) &&
- !rrdhost_flag_check(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS))) {
- // let the data collection threads know we are ready to push metrics
- rrdhost_flag_set(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
- info("STREAM %s [send to %s]: enabling metrics streaming...", rrdhost_hostname(s->host), s->connected_to);
- }
- }
if(unlikely(s->rrdpush_sender_pipe[PIPE_READ] == -1)) {
if(!rrdpush_sender_pipe_close(s->host, s->rrdpush_sender_pipe, true)) {