summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorEmmanuel Vasilakis <mrzammler@mm.st>2021-10-25 11:32:22 +0300
committerGitHub <noreply@github.com>2021-10-25 11:32:22 +0300
commit7e9a2cbb0ba29a8144d30d20be43178faa0a7668 (patch)
tree1e3cdb9bc91b79cc3c0562274483c1c49e93d391 /streaming
parent6f54cd3116d1714b80f2204ce0c80a08595ef88a (diff)
Stream chart labels (#11675)
* stream chart labels * update stream protocol to 4 * only send CLABEL_COMMIT when there are labels * mark host as UNUSED * log error for stray CLABEL_COMMIT * remove commented define
Diffstat (limited to 'streaming')
-rw-r--r--streaming/receiver.c2
-rw-r--r--streaming/rrdpush.c22
-rw-r--r--streaming/rrdpush.h6
3 files changed, 27 insertions, 3 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c
index 6771387920..bb7dca77c1 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -221,6 +221,8 @@ size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp
parser->plugins_action->overwrite_action = &pluginsd_overwrite_action;
parser->plugins_action->chart_action = &pluginsd_chart_action;
parser->plugins_action->set_action = &pluginsd_set_action;
+ parser->plugins_action->clabel_commit_action = &pluginsd_clabel_commit_action;
+ parser->plugins_action->clabel_action = &pluginsd_clabel_action;
user->parser = parser;
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index 038383b55b..53a8976999 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -183,6 +183,24 @@ static inline int need_to_send_chart_definition(RRDSET *st) {
return 0;
}
+// chart labels
+void rrdpush_send_clabels(RRDHOST *host, RRDSET *st) {
+ struct label_index *labels_c = &st->state->labels;
+ if (labels_c) {
+ netdata_rwlock_rdlock(&host->labels.labels_rwlock);
+ struct label *lbl = labels_c->head;
+ while(lbl) {
+ buffer_sprintf(host->sender->build,
+ "CLABEL \"%s\" \"%s\" %d\n", lbl->key, lbl->value, (int)lbl->label_source);
+
+ lbl = lbl->next;
+ }
+ if (labels_c->head)
+ buffer_sprintf(host->sender->build,"CLABEL_COMMIT\n");
+ netdata_rwlock_unlock(&host->labels.labels_rwlock);
+ }
+}
+
// Send the current chart definition.
// Assumes that collector thread has already called sender_start for mutex / buffer state.
static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) {
@@ -224,6 +242,10 @@ static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) {
, (st->module_name)?st->module_name:""
);
+ // send the chart labels
+ if (host->sender->version >= STREAM_VERSION_CLABELS)
+ rrdpush_send_clabels(host, st);
+
// send the dimensions
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index 7e07c8916f..027ccd102a 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -10,10 +10,10 @@
#define CONNECTED_TO_SIZE 100
-// #define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)4 Gap-filling
-#define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)3
-#define VERSION_GAP_FILLING 4
+#define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)4
#define STREAM_VERSION_CLAIM 3
+#define STREAM_VERSION_CLABELS 4
+#define VERSION_GAP_FILLING 5
#define STREAMING_PROTOCOL_VERSION "1.1"
#define START_STREAMING_PROMPT "Hit me baby, push them over..."