diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2022-06-13 20:35:45 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-06-13 20:35:45 +0300 |
commit | 1b0f6c6b2296dc082d85f38c298a61442dcf2490 (patch) | |
tree | 2cfee5101d9cae338d0635f44fe62b010f3548ee /streaming | |
parent | 4c64b8ea4ff720d946bbb9a11ca7474c5673bb6c (diff) |
Labels with dictionary (#13070)
* squashed and rebased to master
* fix overflow and single character bug in sanitize; include rrd.h instead of node_info.h
* added unittest for UTF-8 multibyte sanitization
* Fix unit test compilation
* Fix CMake build
* remove double sanitizer for opentsdb; cleanup sanitize_json_string()
* rename error_description to error_message to avoid conflict with json-c
* revert last and undef error_description from json-c
* more unittests; attempt to fix protobuf map issue
* get rid of rrdlabels_get() and replace it with a safe version that writes the value to a buffer
* added dictionary sorting unittest; rrdlabels_to_buffer() now is sorted
* better sorted dictionary checking
* proper unittesting for sorted dictionaries
* call dictionary deletion callback when destroying the dictionary
* remove obsolete variable
* Fix exporting unit tests
* Fix k8s label parsing test
* workaround for cmocka and strdupz()
* Bypass cmocka memory allocation check
* Revert "Bypass cmocka memory allocation check"
This reverts commit 4c49923839d9229bea23ca914dd8a0be1ebe2bf4.
* Revert "workaround for cmocka and strdupz()"
This reverts commit 7bebee04801db1865c748a7896d5fa54bb7104a5.
* Bypass cmocka memory allocation checks
* respect json formatting for chart labels
* cloud sends colons
* print the value only once
* allow parenthesis in values and spaces; make stream sender send quotes for values
Co-authored-by: Vladimir Kobal <vlad@prokk.net>
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/receiver.c | 9 | ||||
-rw-r--r-- | streaming/rrdpush.c | 48 | ||||
-rw-r--r-- | streaming/sender.c | 8 |
3 files changed, 28 insertions, 37 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c index d20658e658..f29b16c432 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -660,7 +660,14 @@ static int rrdpush_receive(struct receiver_state *rpt) */ // rpt->host->connected_senders++; - rpt->host->labels.labels_flag = (rpt->stream_version > 0)?LABEL_FLAG_UPDATE_STREAM:LABEL_FLAG_STOP_STREAM; + if(rpt->stream_version > 0) { + rrdhost_flag_set(rpt->host, RRDHOST_FLAG_STREAM_LABELS_UPDATE); + rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_STREAM_LABELS_STOP); + } + else { + rrdhost_flag_set(rpt->host, RRDHOST_FLAG_STREAM_LABELS_STOP); + rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_STREAM_LABELS_UPDATE); + } if(health_enabled != CONFIG_BOOLEAN_NO) { if(alarms_delay > 0) { diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index 77774d8d3b..48d0079086 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -193,20 +193,15 @@ static inline int need_to_send_chart_definition(RRDSET *st) { } // chart labels +static int send_clabels_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) { + BUFFER *wb = (BUFFER *)data; + buffer_sprintf(wb, "CLABEL \"%s\" \"%s\" %d\n", name, value, ls); + return 1; +} void rrdpush_send_clabels(RRDHOST *host, RRDSET *st) { - struct label_index *labels_c = &st->state->labels; - if (labels_c) { - netdata_rwlock_rdlock(&host->labels.labels_rwlock); - struct label *lbl = labels_c->head; - while(lbl) { - buffer_sprintf(host->sender->build, - "CLABEL \"%s\" \"%s\" %d\n", lbl->key, lbl->value, (int)lbl->label_source); - - lbl = lbl->next; - } - if (labels_c->head) + if (st->state && st->state->chart_labels) { + if(rrdlabels_walkthrough_read(st->state->chart_labels, send_clabels_callback, host->sender->build) > 0) buffer_sprintf(host->sender->build,"CLABEL_COMMIT\n"); - netdata_rwlock_unlock(&host->labels.labels_rwlock); } } @@ -364,36 +359,25 @@ void rrdset_done_push(RRDSET *st) { } // labels +static int send_labels_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) { + BUFFER *wb = (BUFFER *)data; + buffer_sprintf(wb, "LABEL \"%s\" = %d \"%s\"\n", name, ls, value); + return 1; +} void rrdpush_send_labels(RRDHOST *host) { - if (!host->labels.head || !(host->labels.labels_flag & LABEL_FLAG_UPDATE_STREAM) || (host->labels.labels_flag & LABEL_FLAG_STOP_STREAM)) + if (!host->host_labels || !rrdhost_flag_check(host, RRDHOST_FLAG_STREAM_LABELS_UPDATE) || (rrdhost_flag_check(host, RRDHOST_FLAG_STREAM_LABELS_STOP))) return; sender_start(host->sender); - rrdhost_rdlock(host); - netdata_rwlock_rdlock(&host->labels.labels_rwlock); - - struct label *label_i = host->labels.head; - while(label_i) { - buffer_sprintf(host->sender->build - , "LABEL \"%s\" = %d %s\n" - , label_i->key - , (int)label_i->label_source - , label_i->value); - - label_i = label_i->next; - } - - buffer_sprintf(host->sender->build - , "OVERWRITE %s\n", "labels"); - netdata_rwlock_unlock(&host->labels.labels_rwlock); - rrdhost_unlock(host); + rrdlabels_walkthrough_read(host->host_labels, send_labels_callback, host->sender->build); + buffer_sprintf(host->sender->build, "OVERWRITE %s\n", "labels"); sender_commit(host->sender); if(host->rrdpush_sender_pipe[PIPE_WRITE] != -1 && write(host->rrdpush_sender_pipe[PIPE_WRITE], " ", 1) == -1) error("STREAM %s [send]: cannot write to internal pipe", host->hostname); - host->labels.labels_flag &= ~LABEL_FLAG_UPDATE_STREAM; + rrdhost_flag_clear(host, RRDHOST_FLAG_STREAM_LABELS_UPDATE); } void rrdpush_claimed_id(RRDHOST *host) diff --git a/streaming/sender.c b/streaming/sender.c index a95cc86734..0ce67bc7d0 100644 --- a/streaming/sender.c +++ b/streaming/sender.c @@ -172,8 +172,8 @@ static inline void rrdpush_sender_thread_data_flush(RRDHOST *host) { } static inline void rrdpush_set_flags_to_newest_stream(RRDHOST *host) { - host->labels.labels_flag |= LABEL_FLAG_UPDATE_STREAM; - host->labels.labels_flag &= ~LABEL_FLAG_STOP_STREAM; + rrdhost_flag_set(host, RRDHOST_FLAG_STREAM_LABELS_UPDATE); + rrdhost_flag_clear(host, RRDHOST_FLAG_STREAM_LABELS_STOP); } void rrdpush_encode_variable(stream_encoded_t *se, RRDHOST *host) @@ -236,8 +236,8 @@ static inline long int parse_stream_version(RRDHOST *host, char *http) answer = memcmp(http, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT)); if (!answer) { stream_version = 0; - host->labels.labels_flag |= LABEL_FLAG_STOP_STREAM; - host->labels.labels_flag &= ~LABEL_FLAG_UPDATE_STREAM; + rrdhost_flag_set(host, RRDHOST_FLAG_STREAM_LABELS_STOP); + rrdhost_flag_clear(host, RRDHOST_FLAG_STREAM_LABELS_UPDATE); } else { stream_version = parse_stream_version_for_errors(http); |