summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--database/rrddim.c6
-rw-r--r--libnetdata/buffer/buffer.c12
-rw-r--r--libnetdata/buffer/buffer.h1
-rw-r--r--streaming/rrdpush.c102
4 files changed, 76 insertions, 45 deletions
diff --git a/database/rrddim.c b/database/rrddim.c
index 1ee9a88b87..a06088ac56 100644
--- a/database/rrddim.c
+++ b/database/rrddim.c
@@ -363,8 +363,8 @@ inline int rrddim_set_algorithm(RRDSET *st, RRDDIM *rd, RRD_ALGORITHM algorithm)
debug(D_RRD_CALLS, "Updating algorithm of dimension '%s/%s' from %s to %s", rrdset_id(st), rrddim_name(rd), rrd_algorithm_name(rd->algorithm), rrd_algorithm_name(algorithm));
rd->algorithm = algorithm;
rd->exposed = 0;
- rrdset_flag_set(st, RRDSET_FLAG_HOMOGENEOUS_CHECK);
rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
+ rrdset_flag_set(st, RRDSET_FLAG_HOMOGENEOUS_CHECK);
rrdcontext_updated_rrddim_algorithm(rd);
return 1;
}
@@ -376,8 +376,8 @@ inline int rrddim_set_multiplier(RRDSET *st, RRDDIM *rd, collected_number multip
debug(D_RRD_CALLS, "Updating multiplier of dimension '%s/%s' from " COLLECTED_NUMBER_FORMAT " to " COLLECTED_NUMBER_FORMAT, rrdset_id(st), rrddim_name(rd), rd->multiplier, multiplier);
rd->multiplier = multiplier;
rd->exposed = 0;
- rrdset_flag_set(st, RRDSET_FLAG_HOMOGENEOUS_CHECK);
rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
+ rrdset_flag_set(st, RRDSET_FLAG_HOMOGENEOUS_CHECK);
rrdcontext_updated_rrddim_multiplier(rd);
return 1;
}
@@ -389,8 +389,8 @@ inline int rrddim_set_divisor(RRDSET *st, RRDDIM *rd, collected_number divisor)
debug(D_RRD_CALLS, "Updating divisor of dimension '%s/%s' from " COLLECTED_NUMBER_FORMAT " to " COLLECTED_NUMBER_FORMAT, rrdset_id(st), rrddim_name(rd), rd->divisor, divisor);
rd->divisor = divisor;
rd->exposed = 0;
- rrdset_flag_set(st, RRDSET_FLAG_HOMOGENEOUS_CHECK);
rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
+ rrdset_flag_set(st, RRDSET_FLAG_HOMOGENEOUS_CHECK);
rrdcontext_updated_rrddim_divisor(rd);
return 1;
}
diff --git a/libnetdata/buffer/buffer.c b/libnetdata/buffer/buffer.c
index 8a32184f60..3d29c30255 100644
--- a/libnetdata/buffer/buffer.c
+++ b/libnetdata/buffer/buffer.c
@@ -136,6 +136,18 @@ void buffer_print_llu(BUFFER *wb, unsigned long long uvalue)
wb->len += wstr - str;
}
+void buffer_print_ll(BUFFER *wb, long long value)
+{
+ buffer_need_bytes(wb, 50);
+
+ if(value < 0) {
+ buffer_fast_strcat(wb, "-", 1);
+ value = -value;
+ }
+
+ buffer_print_llu(wb, value);
+}
+
void buffer_fast_strcat(BUFFER *wb, const char *txt, size_t len) {
if(unlikely(!txt || !*txt)) return;
diff --git a/libnetdata/buffer/buffer.h b/libnetdata/buffer/buffer.h
index 42425b4cb7..c134cc9aca 100644
--- a/libnetdata/buffer/buffer.h
+++ b/libnetdata/buffer/buffer.h
@@ -78,6 +78,7 @@ extern char *print_number_llu_r(char *str, unsigned long long uvalue);
extern char *print_number_llu_r_smart(char *str, unsigned long long uvalue);
extern void buffer_print_llu(BUFFER *wb, unsigned long long uvalue);
+extern void buffer_print_ll(BUFFER *wb, long long value);
static inline void buffer_need_bytes(BUFFER *buffer, size_t needed_free_size) {
if(unlikely(buffer->size - buffer->len < needed_free_size))
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index 514bb59c92..9fa9793de2 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -129,29 +129,44 @@ int rrdpush_init() {
unsigned int remote_clock_resync_iterations = 60;
-static inline int should_send_chart_matching(RRDSET *st) {
- // Do not stream anomaly rates charts.
- if (unlikely(rrdset_is_ar_chart(st)))
- return false;
-
- if (rrdset_flag_check(st, RRDSET_FLAG_ANOMALY_DETECTION))
- return ml_streaming_enabled();
+static inline bool should_send_chart_matching(RRDSET *st) {
+ RRDSET_FLAGS flags = rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_SEND|RRDSET_FLAG_UPSTREAM_IGNORE);
- if(!rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_SEND|RRDSET_FLAG_UPSTREAM_IGNORE)) {
+ if(unlikely(!flags)) {
RRDHOST *host = st->rrdhost;
- if(simple_pattern_matches(host->rrdpush_send_charts_matching, rrdset_id(st)) ||
+ // Do not stream anomaly rates charts.
+ if (unlikely(rrdset_is_ar_chart(st))) {
+ rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_SEND);
+ rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE);
+ flags = RRDSET_FLAG_UPSTREAM_IGNORE;
+ }
+ else if (rrdset_flag_check(st, RRDSET_FLAG_ANOMALY_DETECTION)) {
+ if(ml_streaming_enabled()) {
+ rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_IGNORE);
+ rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_SEND);
+ flags = RRDSET_FLAG_UPSTREAM_SEND;
+ }
+ else {
+ rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_SEND);
+ rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE);
+ flags = RRDSET_FLAG_UPSTREAM_IGNORE;
+ }
+ }
+ else if(simple_pattern_matches(host->rrdpush_send_charts_matching, rrdset_id(st)) ||
simple_pattern_matches(host->rrdpush_send_charts_matching, rrdset_name(st))) {
rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_IGNORE);
rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_SEND);
+ flags = RRDSET_FLAG_UPSTREAM_SEND;
}
else {
rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_SEND);
rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE);
+ flags = RRDSET_FLAG_UPSTREAM_IGNORE;
}
}
- return(rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_SEND));
+ return flags & RRDSET_FLAG_UPSTREAM_SEND;
}
int configured_as_parent() {
@@ -173,22 +188,7 @@ int configured_as_parent() {
return is_parent;
}
-// checks if the current chart definition has been sent
-static inline int need_to_send_chart_definition(RRDSET *st) {
- if(unlikely(!(rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_EXPOSED))))
- return 1;
-
- RRDDIM *rd;
- dfe_start_read(st->rrddim_root_index, rd) {
- if(unlikely(!rd->exposed)) {
- internal_error(true, "host '%s', chart '%s', dimension '%s' flag 'exposed' triggered chart refresh to upstream", rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(rd));
- return 1;
- }
- }
- dfe_done(rd);
-
- return 0;
-}
+#define need_to_send_chart_definition(st) (!rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_EXPOSED))
// chart labels
static int send_clabels_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) {
@@ -276,22 +276,42 @@ static inline void rrdpush_send_chart_definition(RRDSET *st) {
// sends the current chart dimensions
static inline bool rrdpush_send_chart_metrics_nolock(RRDSET *st, struct sender_state *s) {
RRDHOST *host = st->rrdhost;
- buffer_sprintf(host->sender->build, "BEGIN \"%s\" %llu", rrdset_id(st), (st->last_collected_time.tv_sec > st->upstream_resync_time)?st->usec_since_last_update:0);
- if (s->version >= VERSION_GAP_FILLING)
- buffer_sprintf(host->sender->build, " %"PRId64"\n", (int64_t)st->last_collected_time.tv_sec);
- else
- buffer_strcat(host->sender->build, "\n");
+ BUFFER *wb = host->sender->build;
+
+ buffer_fast_strcat(wb, "BEGIN \"", 7);
+ buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id));
+ buffer_fast_strcat(wb, "\" ", 2);
+ buffer_print_llu(wb, (st->last_collected_time.tv_sec > st->upstream_resync_time)?st->usec_since_last_update:0);
+
+ if (s->version >= VERSION_GAP_FILLING) {
+ buffer_fast_strcat(wb, " ", 1);
+ buffer_print_ll(wb, st->last_collected_time.tv_sec);
+ }
+
+ buffer_fast_strcat(wb, "\n", 1);
size_t count_of_dimensions_written = 0;
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
- if(rd->updated && rd->exposed) {
- buffer_sprintf(host->sender->build, "SET \"%s\" = " COLLECTED_NUMBER_FORMAT "\n", rrddim_id(rd), rd->collected_value);
+ if(unlikely(!rd->updated))
+ continue;
+
+ if(likely(rd->exposed)) {
+ buffer_fast_strcat(wb, "SET \"", 5);
+ buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
+ buffer_fast_strcat(wb, "\" = ", 4);
+ buffer_print_ll(wb, rd->collected_value);
+ buffer_fast_strcat(wb, "\n", 1);
count_of_dimensions_written++;
}
+ else {
+ internal_error(true, "host '%s', chart '%s', dimension '%s' flag 'exposed' is updated but not exposed", rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(rd));
+ // we will include it in the next iteration
+ rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
+ }
}
rrddim_foreach_done(rd);
- buffer_strcat(host->sender->build, "END\n");
+ buffer_fast_strcat(wb, "END\n", 4);
return count_of_dimensions_written != 0;
}
@@ -350,15 +370,16 @@ void rrdset_done_push(RRDSET *st) {
RRDHOST *host = st->rrdhost;
- if(unlikely(host->rrdpush_send_enabled && !host->rrdpush_sender_spawn))
- rrdpush_sender_thread_spawn(host);
-
// Handle non-connected case
if(unlikely(!__atomic_load_n(&host->rrdpush_sender_connected, __ATOMIC_SEQ_CST)
|| !rrdhost_flag_check(host, RRDHOST_FLAG_STREAM_COLLECTED_METRICS))) {
+ if(unlikely(host->rrdpush_send_enabled && !host->rrdpush_sender_spawn))
+ rrdpush_sender_thread_spawn(host);
+
if(unlikely(!host->rrdpush_sender_error_shown))
error("STREAM %s [send]: not ready - collected metrics are not sent to parent.", rrdhost_hostname(host));
+
host->rrdpush_sender_error_shown = 1;
return;
@@ -368,15 +389,12 @@ void rrdset_done_push(RRDSET *st) {
host->rrdpush_sender_error_shown = 0;
}
- if(dictionary_entries(st->rrddim_root_index) == 0)
- return;
-
sender_start(host->sender);
- if(need_to_send_chart_definition(st))
+ if(unlikely(need_to_send_chart_definition(st)))
rrdpush_send_chart_definition(st);
- if(rrdpush_send_chart_metrics_nolock(st, host->sender)) {
+ if(likely(rrdpush_send_chart_metrics_nolock(st, host->sender))) {
// signal the sender there are more data
if (host->rrdpush_sender_pipe[PIPE_WRITE] != -1 && write(host->rrdpush_sender_pipe[PIPE_WRITE], " ", 1) == -1)
error("STREAM %s [send]: cannot write to internal pipe", rrdhost_hostname(host));