summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-06-13 20:35:45 +0300
committerGitHub <noreply@github.com>2022-06-13 20:35:45 +0300
commit1b0f6c6b2296dc082d85f38c298a61442dcf2490 (patch)
tree2cfee5101d9cae338d0635f44fe62b010f3548ee /streaming
parent4c64b8ea4ff720d946bbb9a11ca7474c5673bb6c (diff)
Labels with dictionary (#13070)
* squashed and rebased to master * fix overflow and single character bug in sanitize; include rrd.h instead of node_info.h * added unittest for UTF-8 multibyte sanitization * Fix unit test compilation * Fix CMake build * remove double sanitizer for opentsdb; cleanup sanitize_json_string() * rename error_description to error_message to avoid conflict with json-c * revert last and undef error_description from json-c * more unittests; attempt to fix protobuf map issue * get rid of rrdlabels_get() and replace it with a safe version that writes the value to a buffer * added dictionary sorting unittest; rrdlabels_to_buffer() now is sorted * better sorted dictionary checking * proper unittesting for sorted dictionaries * call dictionary deletion callback when destroying the dictionary * remove obsolete variable * Fix exporting unit tests * Fix k8s label parsing test * workaround for cmocka and strdupz() * Bypass cmocka memory allocation check * Revert "Bypass cmocka memory allocation check" This reverts commit 4c49923839d9229bea23ca914dd8a0be1ebe2bf4. * Revert "workaround for cmocka and strdupz()" This reverts commit 7bebee04801db1865c748a7896d5fa54bb7104a5. * Bypass cmocka memory allocation checks * respect json formatting for chart labels * cloud sends colons * print the value only once * allow parenthesis in values and spaces; make stream sender send quotes for values Co-authored-by: Vladimir Kobal <vlad@prokk.net>
Diffstat (limited to 'streaming')
-rw-r--r--streaming/receiver.c9
-rw-r--r--streaming/rrdpush.c48
-rw-r--r--streaming/sender.c8
3 files changed, 28 insertions, 37 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c
index d20658e658..f29b16c432 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -660,7 +660,14 @@ static int rrdpush_receive(struct receiver_state *rpt)
*/
// rpt->host->connected_senders++;
- rpt->host->labels.labels_flag = (rpt->stream_version > 0)?LABEL_FLAG_UPDATE_STREAM:LABEL_FLAG_STOP_STREAM;
+ if(rpt->stream_version > 0) {
+ rrdhost_flag_set(rpt->host, RRDHOST_FLAG_STREAM_LABELS_UPDATE);
+ rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_STREAM_LABELS_STOP);
+ }
+ else {
+ rrdhost_flag_set(rpt->host, RRDHOST_FLAG_STREAM_LABELS_STOP);
+ rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_STREAM_LABELS_UPDATE);
+ }
if(health_enabled != CONFIG_BOOLEAN_NO) {
if(alarms_delay > 0) {
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index 77774d8d3b..48d0079086 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -193,20 +193,15 @@ static inline int need_to_send_chart_definition(RRDSET *st) {
}
// chart labels
+static int send_clabels_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) {
+ BUFFER *wb = (BUFFER *)data;
+ buffer_sprintf(wb, "CLABEL \"%s\" \"%s\" %d\n", name, value, ls);
+ return 1;
+}
void rrdpush_send_clabels(RRDHOST *host, RRDSET *st) {
- struct label_index *labels_c = &st->state->labels;
- if (labels_c) {
- netdata_rwlock_rdlock(&host->labels.labels_rwlock);
- struct label *lbl = labels_c->head;
- while(lbl) {
- buffer_sprintf(host->sender->build,
- "CLABEL \"%s\" \"%s\" %d\n", lbl->key, lbl->value, (int)lbl->label_source);
-
- lbl = lbl->next;
- }
- if (labels_c->head)
+ if (st->state && st->state->chart_labels) {
+ if(rrdlabels_walkthrough_read(st->state->chart_labels, send_clabels_callback, host->sender->build) > 0)
buffer_sprintf(host->sender->build,"CLABEL_COMMIT\n");
- netdata_rwlock_unlock(&host->labels.labels_rwlock);
}
}
@@ -364,36 +359,25 @@ void rrdset_done_push(RRDSET *st) {
}
// labels
+static int send_labels_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) {
+ BUFFER *wb = (BUFFER *)data;
+ buffer_sprintf(wb, "LABEL \"%s\" = %d \"%s\"\n", name, ls, value);
+ return 1;
+}
void rrdpush_send_labels(RRDHOST *host) {
- if (!host->labels.head || !(host->labels.labels_flag & LABEL_FLAG_UPDATE_STREAM) || (host->labels.labels_flag & LABEL_FLAG_STOP_STREAM))
+ if (!host->host_labels || !rrdhost_flag_check(host, RRDHOST_FLAG_STREAM_LABELS_UPDATE) || (rrdhost_flag_check(host, RRDHOST_FLAG_STREAM_LABELS_STOP)))
return;
sender_start(host->sender);
- rrdhost_rdlock(host);
- netdata_rwlock_rdlock(&host->labels.labels_rwlock);
-
- struct label *label_i = host->labels.head;
- while(label_i) {
- buffer_sprintf(host->sender->build
- , "LABEL \"%s\" = %d %s\n"
- , label_i->key
- , (int)label_i->label_source
- , label_i->value);
-
- label_i = label_i->next;
- }
-
- buffer_sprintf(host->sender->build
- , "OVERWRITE %s\n", "labels");
- netdata_rwlock_unlock(&host->labels.labels_rwlock);
- rrdhost_unlock(host);
+ rrdlabels_walkthrough_read(host->host_labels, send_labels_callback, host->sender->build);
+ buffer_sprintf(host->sender->build, "OVERWRITE %s\n", "labels");
sender_commit(host->sender);
if(host->rrdpush_sender_pipe[PIPE_WRITE] != -1 && write(host->rrdpush_sender_pipe[PIPE_WRITE], " ", 1) == -1)
error("STREAM %s [send]: cannot write to internal pipe", host->hostname);
- host->labels.labels_flag &= ~LABEL_FLAG_UPDATE_STREAM;
+ rrdhost_flag_clear(host, RRDHOST_FLAG_STREAM_LABELS_UPDATE);
}
void rrdpush_claimed_id(RRDHOST *host)
diff --git a/streaming/sender.c b/streaming/sender.c
index a95cc86734..0ce67bc7d0 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -172,8 +172,8 @@ static inline void rrdpush_sender_thread_data_flush(RRDHOST *host) {
}
static inline void rrdpush_set_flags_to_newest_stream(RRDHOST *host) {
- host->labels.labels_flag |= LABEL_FLAG_UPDATE_STREAM;
- host->labels.labels_flag &= ~LABEL_FLAG_STOP_STREAM;
+ rrdhost_flag_set(host, RRDHOST_FLAG_STREAM_LABELS_UPDATE);
+ rrdhost_flag_clear(host, RRDHOST_FLAG_STREAM_LABELS_STOP);
}
void rrdpush_encode_variable(stream_encoded_t *se, RRDHOST *host)
@@ -236,8 +236,8 @@ static inline long int parse_stream_version(RRDHOST *host, char *http)
answer = memcmp(http, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT));
if (!answer) {
stream_version = 0;
- host->labels.labels_flag |= LABEL_FLAG_STOP_STREAM;
- host->labels.labels_flag &= ~LABEL_FLAG_UPDATE_STREAM;
+ rrdhost_flag_set(host, RRDHOST_FLAG_STREAM_LABELS_STOP);
+ rrdhost_flag_clear(host, RRDHOST_FLAG_STREAM_LABELS_UPDATE);
}
else {
stream_version = parse_stream_version_for_errors(http);