summaryrefslogtreecommitdiffstats
path: root/streaming/rrdpush.c
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/rrdpush.c')
-rw-r--r--streaming/rrdpush.c48
1 files changed, 16 insertions, 32 deletions
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index 77774d8d3b..48d0079086 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -193,20 +193,15 @@ static inline int need_to_send_chart_definition(RRDSET *st) {
}
// chart labels
+static int send_clabels_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) {
+ BUFFER *wb = (BUFFER *)data;
+ buffer_sprintf(wb, "CLABEL \"%s\" \"%s\" %d\n", name, value, ls);
+ return 1;
+}
void rrdpush_send_clabels(RRDHOST *host, RRDSET *st) {
- struct label_index *labels_c = &st->state->labels;
- if (labels_c) {
- netdata_rwlock_rdlock(&host->labels.labels_rwlock);
- struct label *lbl = labels_c->head;
- while(lbl) {
- buffer_sprintf(host->sender->build,
- "CLABEL \"%s\" \"%s\" %d\n", lbl->key, lbl->value, (int)lbl->label_source);
-
- lbl = lbl->next;
- }
- if (labels_c->head)
+ if (st->state && st->state->chart_labels) {
+ if(rrdlabels_walkthrough_read(st->state->chart_labels, send_clabels_callback, host->sender->build) > 0)
buffer_sprintf(host->sender->build,"CLABEL_COMMIT\n");
- netdata_rwlock_unlock(&host->labels.labels_rwlock);
}
}
@@ -364,36 +359,25 @@ void rrdset_done_push(RRDSET *st) {
}
// labels
+static int send_labels_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) {
+ BUFFER *wb = (BUFFER *)data;
+ buffer_sprintf(wb, "LABEL \"%s\" = %d \"%s\"\n", name, ls, value);
+ return 1;
+}
void rrdpush_send_labels(RRDHOST *host) {
- if (!host->labels.head || !(host->labels.labels_flag & LABEL_FLAG_UPDATE_STREAM) || (host->labels.labels_flag & LABEL_FLAG_STOP_STREAM))
+ if (!host->host_labels || !rrdhost_flag_check(host, RRDHOST_FLAG_STREAM_LABELS_UPDATE) || (rrdhost_flag_check(host, RRDHOST_FLAG_STREAM_LABELS_STOP)))
return;
sender_start(host->sender);
- rrdhost_rdlock(host);
- netdata_rwlock_rdlock(&host->labels.labels_rwlock);
-
- struct label *label_i = host->labels.head;
- while(label_i) {
- buffer_sprintf(host->sender->build
- , "LABEL \"%s\" = %d %s\n"
- , label_i->key
- , (int)label_i->label_source
- , label_i->value);
-
- label_i = label_i->next;
- }
-
- buffer_sprintf(host->sender->build
- , "OVERWRITE %s\n", "labels");
- netdata_rwlock_unlock(&host->labels.labels_rwlock);
- rrdhost_unlock(host);
+ rrdlabels_walkthrough_read(host->host_labels, send_labels_callback, host->sender->build);
+ buffer_sprintf(host->sender->build, "OVERWRITE %s\n", "labels");
sender_commit(host->sender);
if(host->rrdpush_sender_pipe[PIPE_WRITE] != -1 && write(host->rrdpush_sender_pipe[PIPE_WRITE], " ", 1) == -1)
error("STREAM %s [send]: cannot write to internal pipe", host->hostname);
- host->labels.labels_flag &= ~LABEL_FLAG_UPDATE_STREAM;
+ rrdhost_flag_clear(host, RRDHOST_FLAG_STREAM_LABELS_UPDATE);
}
void rrdpush_claimed_id(RRDHOST *host)