diff options
-rw-r--r-- | database/rrddim.c | 6 | ||||
-rw-r--r-- | libnetdata/buffer/buffer.c | 12 | ||||
-rw-r--r-- | libnetdata/buffer/buffer.h | 1 | ||||
-rw-r--r-- | streaming/rrdpush.c | 102 |
4 files changed, 76 insertions, 45 deletions
diff --git a/database/rrddim.c b/database/rrddim.c index 1ee9a88b87..a06088ac56 100644 --- a/database/rrddim.c +++ b/database/rrddim.c @@ -363,8 +363,8 @@ inline int rrddim_set_algorithm(RRDSET *st, RRDDIM *rd, RRD_ALGORITHM algorithm) debug(D_RRD_CALLS, "Updating algorithm of dimension '%s/%s' from %s to %s", rrdset_id(st), rrddim_name(rd), rrd_algorithm_name(rd->algorithm), rrd_algorithm_name(algorithm)); rd->algorithm = algorithm; rd->exposed = 0; - rrdset_flag_set(st, RRDSET_FLAG_HOMOGENEOUS_CHECK); rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED); + rrdset_flag_set(st, RRDSET_FLAG_HOMOGENEOUS_CHECK); rrdcontext_updated_rrddim_algorithm(rd); return 1; } @@ -376,8 +376,8 @@ inline int rrddim_set_multiplier(RRDSET *st, RRDDIM *rd, collected_number multip debug(D_RRD_CALLS, "Updating multiplier of dimension '%s/%s' from " COLLECTED_NUMBER_FORMAT " to " COLLECTED_NUMBER_FORMAT, rrdset_id(st), rrddim_name(rd), rd->multiplier, multiplier); rd->multiplier = multiplier; rd->exposed = 0; - rrdset_flag_set(st, RRDSET_FLAG_HOMOGENEOUS_CHECK); rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED); + rrdset_flag_set(st, RRDSET_FLAG_HOMOGENEOUS_CHECK); rrdcontext_updated_rrddim_multiplier(rd); return 1; } @@ -389,8 +389,8 @@ inline int rrddim_set_divisor(RRDSET *st, RRDDIM *rd, collected_number divisor) debug(D_RRD_CALLS, "Updating divisor of dimension '%s/%s' from " COLLECTED_NUMBER_FORMAT " to " COLLECTED_NUMBER_FORMAT, rrdset_id(st), rrddim_name(rd), rd->divisor, divisor); rd->divisor = divisor; rd->exposed = 0; - rrdset_flag_set(st, RRDSET_FLAG_HOMOGENEOUS_CHECK); rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED); + rrdset_flag_set(st, RRDSET_FLAG_HOMOGENEOUS_CHECK); rrdcontext_updated_rrddim_divisor(rd); return 1; } diff --git a/libnetdata/buffer/buffer.c b/libnetdata/buffer/buffer.c index 8a32184f60..3d29c30255 100644 --- a/libnetdata/buffer/buffer.c +++ b/libnetdata/buffer/buffer.c @@ -136,6 +136,18 @@ void buffer_print_llu(BUFFER *wb, unsigned long long uvalue) wb->len += wstr - str; } +void buffer_print_ll(BUFFER *wb, long long value) +{ + buffer_need_bytes(wb, 50); + + if(value < 0) { + buffer_fast_strcat(wb, "-", 1); + value = -value; + } + + buffer_print_llu(wb, value); +} + void buffer_fast_strcat(BUFFER *wb, const char *txt, size_t len) { if(unlikely(!txt || !*txt)) return; diff --git a/libnetdata/buffer/buffer.h b/libnetdata/buffer/buffer.h index 42425b4cb7..c134cc9aca 100644 --- a/libnetdata/buffer/buffer.h +++ b/libnetdata/buffer/buffer.h @@ -78,6 +78,7 @@ extern char *print_number_llu_r(char *str, unsigned long long uvalue); extern char *print_number_llu_r_smart(char *str, unsigned long long uvalue); extern void buffer_print_llu(BUFFER *wb, unsigned long long uvalue); +extern void buffer_print_ll(BUFFER *wb, long long value); static inline void buffer_need_bytes(BUFFER *buffer, size_t needed_free_size) { if(unlikely(buffer->size - buffer->len < needed_free_size)) diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index 514bb59c92..9fa9793de2 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -129,29 +129,44 @@ int rrdpush_init() { unsigned int remote_clock_resync_iterations = 60; -static inline int should_send_chart_matching(RRDSET *st) { - // Do not stream anomaly rates charts. - if (unlikely(rrdset_is_ar_chart(st))) - return false; - - if (rrdset_flag_check(st, RRDSET_FLAG_ANOMALY_DETECTION)) - return ml_streaming_enabled(); +static inline bool should_send_chart_matching(RRDSET *st) { + RRDSET_FLAGS flags = rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_SEND|RRDSET_FLAG_UPSTREAM_IGNORE); - if(!rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_SEND|RRDSET_FLAG_UPSTREAM_IGNORE)) { + if(unlikely(!flags)) { RRDHOST *host = st->rrdhost; - if(simple_pattern_matches(host->rrdpush_send_charts_matching, rrdset_id(st)) || + // Do not stream anomaly rates charts. + if (unlikely(rrdset_is_ar_chart(st))) { + rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_SEND); + rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE); + flags = RRDSET_FLAG_UPSTREAM_IGNORE; + } + else if (rrdset_flag_check(st, RRDSET_FLAG_ANOMALY_DETECTION)) { + if(ml_streaming_enabled()) { + rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_IGNORE); + rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_SEND); + flags = RRDSET_FLAG_UPSTREAM_SEND; + } + else { + rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_SEND); + rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE); + flags = RRDSET_FLAG_UPSTREAM_IGNORE; + } + } + else if(simple_pattern_matches(host->rrdpush_send_charts_matching, rrdset_id(st)) || simple_pattern_matches(host->rrdpush_send_charts_matching, rrdset_name(st))) { rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_IGNORE); rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_SEND); + flags = RRDSET_FLAG_UPSTREAM_SEND; } else { rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_SEND); rrdset_flag_set(st, RRDSET_FLAG_UPSTREAM_IGNORE); + flags = RRDSET_FLAG_UPSTREAM_IGNORE; } } - return(rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_SEND)); + return flags & RRDSET_FLAG_UPSTREAM_SEND; } int configured_as_parent() { @@ -173,22 +188,7 @@ int configured_as_parent() { return is_parent; } -// checks if the current chart definition has been sent -static inline int need_to_send_chart_definition(RRDSET *st) { - if(unlikely(!(rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_EXPOSED)))) - return 1; - - RRDDIM *rd; - dfe_start_read(st->rrddim_root_index, rd) { - if(unlikely(!rd->exposed)) { - internal_error(true, "host '%s', chart '%s', dimension '%s' flag 'exposed' triggered chart refresh to upstream", rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(rd)); - return 1; - } - } - dfe_done(rd); - - return 0; -} +#define need_to_send_chart_definition(st) (!rrdset_flag_check(st, RRDSET_FLAG_UPSTREAM_EXPOSED)) // chart labels static int send_clabels_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) { @@ -276,22 +276,42 @@ static inline void rrdpush_send_chart_definition(RRDSET *st) { // sends the current chart dimensions static inline bool rrdpush_send_chart_metrics_nolock(RRDSET *st, struct sender_state *s) { RRDHOST *host = st->rrdhost; - buffer_sprintf(host->sender->build, "BEGIN \"%s\" %llu", rrdset_id(st), (st->last_collected_time.tv_sec > st->upstream_resync_time)?st->usec_since_last_update:0); - if (s->version >= VERSION_GAP_FILLING) - buffer_sprintf(host->sender->build, " %"PRId64"\n", (int64_t)st->last_collected_time.tv_sec); - else - buffer_strcat(host->sender->build, "\n"); + BUFFER *wb = host->sender->build; + + buffer_fast_strcat(wb, "BEGIN \"", 7); + buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id)); + buffer_fast_strcat(wb, "\" ", 2); + buffer_print_llu(wb, (st->last_collected_time.tv_sec > st->upstream_resync_time)?st->usec_since_last_update:0); + + if (s->version >= VERSION_GAP_FILLING) { + buffer_fast_strcat(wb, " ", 1); + buffer_print_ll(wb, st->last_collected_time.tv_sec); + } + + buffer_fast_strcat(wb, "\n", 1); size_t count_of_dimensions_written = 0; RRDDIM *rd; rrddim_foreach_read(rd, st) { - if(rd->updated && rd->exposed) { - buffer_sprintf(host->sender->build, "SET \"%s\" = " COLLECTED_NUMBER_FORMAT "\n", rrddim_id(rd), rd->collected_value); + if(unlikely(!rd->updated)) + continue; + + if(likely(rd->exposed)) { + buffer_fast_strcat(wb, "SET \"", 5); + buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id)); + buffer_fast_strcat(wb, "\" = ", 4); + buffer_print_ll(wb, rd->collected_value); + buffer_fast_strcat(wb, "\n", 1); count_of_dimensions_written++; } + else { + internal_error(true, "host '%s', chart '%s', dimension '%s' flag 'exposed' is updated but not exposed", rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(rd)); + // we will include it in the next iteration + rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED); + } } rrddim_foreach_done(rd); - buffer_strcat(host->sender->build, "END\n"); + buffer_fast_strcat(wb, "END\n", 4); return count_of_dimensions_written != 0; } @@ -350,15 +370,16 @@ void rrdset_done_push(RRDSET *st) { RRDHOST *host = st->rrdhost; - if(unlikely(host->rrdpush_send_enabled && !host->rrdpush_sender_spawn)) - rrdpush_sender_thread_spawn(host); - // Handle non-connected case if(unlikely(!__atomic_load_n(&host->rrdpush_sender_connected, __ATOMIC_SEQ_CST) || !rrdhost_flag_check(host, RRDHOST_FLAG_STREAM_COLLECTED_METRICS))) { + if(unlikely(host->rrdpush_send_enabled && !host->rrdpush_sender_spawn)) + rrdpush_sender_thread_spawn(host); + if(unlikely(!host->rrdpush_sender_error_shown)) error("STREAM %s [send]: not ready - collected metrics are not sent to parent.", rrdhost_hostname(host)); + host->rrdpush_sender_error_shown = 1; return; @@ -368,15 +389,12 @@ void rrdset_done_push(RRDSET *st) { host->rrdpush_sender_error_shown = 0; } - if(dictionary_entries(st->rrddim_root_index) == 0) - return; - sender_start(host->sender); - if(need_to_send_chart_definition(st)) + if(unlikely(need_to_send_chart_definition(st))) rrdpush_send_chart_definition(st); - if(rrdpush_send_chart_metrics_nolock(st, host->sender)) { + if(likely(rrdpush_send_chart_metrics_nolock(st, host->sender))) { // signal the sender there are more data if (host->rrdpush_sender_pipe[PIPE_WRITE] != -1 && write(host->rrdpush_sender_pipe[PIPE_WRITE], " ", 1) == -1) error("STREAM %s [send]: cannot write to internal pipe", rrdhost_hostname(host)); |