summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--collectors/plugins.d/plugins_d.c40
-rw-r--r--collectors/plugins.d/plugins_d.h2
-rw-r--r--database/rrd.h6
-rw-r--r--database/rrdhost.c33
-rw-r--r--streaming/rrdpush.c84
-rw-r--r--streaming/rrdpush.h1
-rw-r--r--streaming/stream.conf1
7 files changed, 150 insertions, 17 deletions
diff --git a/collectors/plugins.d/plugins_d.c b/collectors/plugins.d/plugins_d.c
index 32c5840328..7ac17587a9 100644
--- a/collectors/plugins.d/plugins_d.c
+++ b/collectors/plugins.d/plugins_d.c
@@ -234,9 +234,12 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int
uint32_t DIMENSION_HASH = simple_hash(PLUGINSD_KEYWORD_DIMENSION);
uint32_t DISABLE_HASH = simple_hash(PLUGINSD_KEYWORD_DISABLE);
uint32_t VARIABLE_HASH = simple_hash(PLUGINSD_KEYWORD_VARIABLE);
+ uint32_t LABEL_HASH = simple_hash(PLUGINSD_KEYWORD_LABEL);
+ uint32_t OVERWRITE_HASH = simple_hash(PLUGINSD_KEYWORD_OVERWRITE);
RRDSET *st = NULL;
uint32_t hash;
+ struct label *new_labels = NULL;
errno = 0;
clearerr(fp);
@@ -283,6 +286,7 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int
#else
r = fgets(line, PLUGINSD_LINE_MAX, fp);
#endif
+
if(unlikely(!r)) {
if(feof(fp))
error("read failed: end of file");
@@ -616,6 +620,39 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int
enabled = 0;
break;
}
+ else if(likely(hash == LABEL_HASH && !strcmp(s, PLUGINSD_KEYWORD_LABEL))) {
+ debug(D_PLUGINSD, "requested a LABEL CHANGE");
+ char *store;
+ if(!words[4])
+ store = words[3];
+ else {
+ store = callocz(PLUGINSD_LINE_MAX + 1, sizeof(char));
+ char *move = store;
+ int i = 3;
+ while (i < w) {
+ size_t length = strlen(words[i]);
+ memcpy(move, words[i], length);
+ move += length;
+ *move++ = ' ';
+
+ i++;
+ if(!words[i])
+ break;
+ }
+ }
+
+ new_labels = add_label_to_list(new_labels, words[1], store, strtol(words[2], NULL, 10));
+ }
+ else if(likely(hash == OVERWRITE_HASH && !strcmp(s, PLUGINSD_KEYWORD_OVERWRITE))) {
+ debug(D_PLUGINSD, "requested a OVERWITE a variable");
+ if(!host->labels) {
+ host->labels = new_labels;
+ } else {
+ replace_label_list(host, new_labels);
+ }
+
+ new_labels = NULL;
+ }
else {
error("sent command '%s' which is not known by netdata, for host '%s'. Disabling it.", s, host->hostname);
enabled = 0;
@@ -626,6 +663,9 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int
cleanup:
cd->enabled = enabled;
+ if(new_labels)
+ free_host_labels(new_labels);
+
if(likely(count)) {
cd->successful_collections += count;
cd->serial_failures = 0;
diff --git a/collectors/plugins.d/plugins_d.h b/collectors/plugins.d/plugins_d.h
index 7d5c7dda47..8134b99650 100644
--- a/collectors/plugins.d/plugins_d.h
+++ b/collectors/plugins.d/plugins_d.h
@@ -29,6 +29,8 @@
#define PLUGINSD_KEYWORD_FLUSH "FLUSH"
#define PLUGINSD_KEYWORD_DISABLE "DISABLE"
#define PLUGINSD_KEYWORD_VARIABLE "VARIABLE"
+#define PLUGINSD_KEYWORD_LABEL "LABEL"
+#define PLUGINSD_KEYWORD_OVERWRITE "OVERWRITE"
#define PLUGINSD_LINE_MAX 1024
#define PLUGINSD_LINE_MAX_SSL_READ 512
diff --git a/database/rrd.h b/database/rrd.h
index 0702fc1710..40efd60e92 100644
--- a/database/rrd.h
+++ b/database/rrd.h
@@ -156,6 +156,9 @@ typedef enum label_source {
LABEL_SOURCE_KUBERNETES = 4
} LABEL_SOURCE;
+#define LABEL_FLAG_UPDATE_STREAM 1
+#define LABEL_FLAG_STOP_STREAM 2
+
struct label {
char *key, *value;
uint32_t key_hash;
@@ -166,6 +169,8 @@ struct label {
char *translate_label_source(LABEL_SOURCE l);
struct label *create_label(char *key, char *value, LABEL_SOURCE label_source);
struct label *add_label_to_list(struct label *l, char *key, char *value, LABEL_SOURCE label_source);
+extern void replace_label_list(RRDHOST *host, struct label *new_labels);
+extern void free_host_labels(struct label *labels);
void reload_host_labels();
// ----------------------------------------------------------------------------
@@ -749,6 +754,7 @@ struct rrdhost {
// Support for host-level labels
struct label *labels;
netdata_rwlock_t labels_rwlock; // lock for the label list
+ uint32_t labels_flag; //Flags for labels
// ------------------------------------------------------------------------
// indexes
diff --git a/database/rrdhost.c b/database/rrdhost.c
index df4e364d9b..a7fac8569b 100644
--- a/database/rrdhost.c
+++ b/database/rrdhost.c
@@ -890,6 +890,26 @@ struct label *create_label(char *key, char *value, LABEL_SOURCE label_source)
return result;
}
+void free_host_labels(struct label *labels)
+{
+ while (labels != NULL)
+ {
+ struct label *current = labels;
+ labels = labels->next;
+ freez(current);
+ }
+}
+
+void replace_label_list(RRDHOST *host, struct label *new_labels)
+{
+ netdata_rwlock_wrlock(&host->labels_rwlock);
+ struct label *old_labels = host->labels;
+ host->labels = new_labels;
+ netdata_rwlock_unlock(&host->labels_rwlock);
+
+ free_host_labels(old_labels);
+}
+
struct label *add_label_to_list(struct label *l, char *key, char *value, LABEL_SOURCE label_source)
{
struct label *lab = create_label(key, value, label_source);
@@ -937,16 +957,11 @@ void reload_host_labels()
struct label *new_labels = merge_label_lists(from_auto, from_k8s);
new_labels = merge_label_lists(new_labels, from_config);
- netdata_rwlock_wrlock(&localhost->labels_rwlock);
- struct label *old_labels = localhost->labels;
- localhost->labels = new_labels;
- netdata_rwlock_unlock(&localhost->labels_rwlock);
+ replace_label_list(localhost, new_labels);
- while (old_labels != NULL)
- {
- struct label *current = old_labels;
- old_labels = old_labels->next;
- freez(current);
+ if(localhost->rrdpush_send_enabled && localhost->rrdpush_sender_buffer){
+ localhost->labels_flag |= LABEL_FLAG_UPDATE_STREAM;
+ rrdpush_send_labels(localhost);
}
health_reload();
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index c88ee717ad..c6c391e4b1 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -25,7 +25,9 @@
*
*/
+#define STREAMING_PROTOCOL_VERSION "1.1"
#define START_STREAMING_PROMPT "Hit me baby, push them over..."
+#define START_STREAMING_PROMPT_V2 "Hit me baby, push them over and bring the host labels..."
typedef enum {
RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW,
@@ -79,6 +81,7 @@ int rrdpush_init() {
default_rrdpush_send_charts_matching = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "send charts matching", "*");
rrdhost_free_orphan_time = config_get_number(CONFIG_SECTION_GLOBAL, "cleanup orphan hosts after seconds", rrdhost_free_orphan_time);
+
if(default_rrdpush_enabled && (!default_rrdpush_destination || !*default_rrdpush_destination || !default_rrdpush_api_key || !*default_rrdpush_api_key)) {
error("STREAM [send]: cannot enable sending thread - information is missing.");
default_rrdpush_enabled = 0;
@@ -338,6 +341,36 @@ void rrdset_done_push(RRDSET *st) {
rrdpush_buffer_unlock(host);
}
+// labels
+void rrdpush_send_labels(RRDHOST *host) {
+ if (!host->labels || !(host->labels_flag & LABEL_FLAG_UPDATE_STREAM) || (host->labels_flag & LABEL_FLAG_STOP_STREAM))
+ return;
+
+ rrdpush_buffer_lock(host);
+ netdata_rwlock_rdlock(&host->labels_rwlock);
+
+ struct label *labels = host->labels;
+ while(labels) {
+ buffer_sprintf(host->rrdpush_sender_buffer
+ , "LABEL \"%s\" = %d %s\n"
+ , labels->key
+ , (int)labels->label_source
+ , labels->value);
+
+ labels = labels->next;
+ }
+
+ buffer_sprintf(host->rrdpush_sender_buffer
+ , "OVERWRITE %s\n", "labels");
+
+ netdata_rwlock_unlock(&host->labels_rwlock);
+
+ if(host->rrdpush_sender_pipe[PIPE_WRITE] != -1 && write(host->rrdpush_sender_pipe[PIPE_WRITE], " ", 1) == -1)
+ error("STREAM %s [send]: cannot write to internal pipe", host->hostname);
+
+ rrdpush_buffer_unlock(host);
+ host->labels_flag &= ~LABEL_FLAG_UPDATE_STREAM;
+}
// ----------------------------------------------------------------------------
// rrdpush sender thread
@@ -536,6 +569,7 @@ static int rrdpush_sender_thread_connect_to_master(RRDHOST *host, int default_po
"&NETDATA_SYSTEM_VIRT_DETECTION=%s"
"&NETDATA_SYSTEM_CONTAINER=%s"
"&NETDATA_SYSTEM_CONTAINER_DETECTION=%s"
+ "&NETDATA_PROTOCOL_VERSION=%s"
" HTTP/1.1\r\n"
"User-Agent: %s/%s\r\n"
"Accept: */*\r\n\r\n"
@@ -560,6 +594,7 @@ static int rrdpush_sender_thread_connect_to_master(RRDHOST *host, int default_po
, (host->system_info->virt_detection) ? host->system_info->virt_detection : ""
, (host->system_info->container) ? host->system_info->container : ""
, (host->system_info->container_detection) ? host->system_info->container_detection : ""
+ , STREAMING_PROTOCOL_VERSION
, host->program_name
, host->program_version
);
@@ -613,7 +648,20 @@ static int rrdpush_sender_thread_connect_to_master(RRDHOST *host, int default_po
return 0;
}
- if(strncmp(http, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT)) != 0) {
+ int answer = strncmp(http, START_STREAMING_PROMPT_V2, strlen(START_STREAMING_PROMPT_V2));
+ if(!answer) {
+ host->labels_flag |= LABEL_FLAG_UPDATE_STREAM;
+ host->labels_flag &= ~LABEL_FLAG_STOP_STREAM;
+ } else {
+ answer = strncmp(http, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT));
+ if(!answer) {
+ host->labels_flag |= LABEL_FLAG_STOP_STREAM;
+ host->labels_flag &= ~LABEL_FLAG_UPDATE_STREAM;
+ info("STREAM %s [send to %s]: is using an old Netdata.", host->hostname, connected_to);
+ }
+ }
+
+ if(answer != 0) {
error("STREAM %s [send to %s]: server is not replying properly (is it a netdata?).", host->hostname, connected_to);
rrdpush_sender_thread_close_socket(host);
return 0;
@@ -814,6 +862,8 @@ void *rrdpush_sender_thread(void *ptr) {
}
if (ofd->revents & POLLOUT) {
+ rrdpush_send_labels(host);
+
if (begin < buffer_strlen(host->rrdpush_sender_buffer)) {
debug(D_STREAM, "STREAM: Sending data (current buffer length %zu bytes, begin = %zu)...", buffer_strlen(host->rrdpush_sender_buffer), begin);
@@ -981,6 +1031,7 @@ static int rrdpush_receive(int fd
, int update_every
, char *client_ip
, char *client_port
+ , int stream_flags
#ifdef ENABLE_HTTPS
, struct netdata_ssl *ssl
#endif
@@ -1097,12 +1148,20 @@ static int rrdpush_receive(int fd
snprintfz(cd.cmd, PLUGINSD_CMD_MAX, "%s:%s", client_ip, client_port);
info("STREAM %s [receive from [%s]:%s]: initializing communication...", host->hostname, client_ip, client_port);
+ char *initial_response;
+ if (stream_flags & LABEL_FLAG_UPDATE_STREAM) {
+ info("STREAM %s [receive from [%s]:%s]: Netdata is using the newest stream protocol.", host->hostname, client_ip, client_port);
+ initial_response = START_STREAMING_PROMPT_V2;
+ } else {
+ info("STREAM %s [receive from [%s]:%s]: Netdata is using an old protocol.", host->hostname, client_ip, client_port);
+ initial_response = START_STREAMING_PROMPT;
+ }
#ifdef ENABLE_HTTPS
host->stream_ssl.conn = ssl->conn;
host->stream_ssl.flags = ssl->flags;
- if(send_timeout(ssl,fd, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT), 0, 60) != strlen(START_STREAMING_PROMPT)) {
+ if(send_timeout(ssl, fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) {
#else
- if(send_timeout(fd, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT), 0, 60) != strlen(START_STREAMING_PROMPT)) {
+ if(send_timeout(fd, initial_response, strlen(initial_response), 0, 60) != strlen(initial_response)) {
#endif
log_stream_connection(client_ip, client_port, key, host->machine_guid, host->hostname, "FAILED - CANNOT REPLY");
error("STREAM %s [receive from [%s]:%s]: cannot send ready command.", host->hostname, client_ip, client_port);
@@ -1142,6 +1201,8 @@ static int rrdpush_receive(int fd
rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN);
host->connected_senders++;
host->senders_disconnected_time = 0;
+ host->labels_flag = stream_flags;
+
if(health_enabled != CONFIG_BOOLEAN_NO) {
if(alarms_delay > 0) {
host->health_delay_up_to = now_realtime_sec() + alarms_delay;
@@ -1196,6 +1257,7 @@ struct rrdpush_thread {
char *program_version;
struct rrdhost_system_info *system_info;
int update_every;
+ int stream_flags;
#ifdef ENABLE_HTTPS
struct netdata_ssl ssl;
#endif
@@ -1251,6 +1313,7 @@ static void *rrdpush_receiver_thread(void *ptr) {
, rpt->update_every
, rpt->client_ip
, rpt->client_port
+ , rpt->stream_flags
#ifdef ENABLE_HTTPS
, &rpt->ssl
#endif
@@ -1300,6 +1363,7 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url
char *key = NULL, *hostname = NULL, *registry_hostname = NULL, *machine_guid = NULL, *os = "unknown", *timezone = "unknown", *tags = NULL;
int update_every = default_rrd_update_every;
char buf[GUID_LEN + 1];
+ int stream_flags = LABEL_FLAG_STOP_STREAM;
struct rrdhost_system_info *system_info = callocz(1, sizeof(struct rrdhost_system_info));
@@ -1327,10 +1391,15 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url
timezone = value;
else if(!strcmp(name, "tags"))
tags = value;
- else
- if(unlikely(rrdhost_set_system_info_variable(system_info, name, value))) {
- info("STREAM [receive from [%s]:%s]: request has parameter '%s' = '%s', which is not used.", w->client_ip, w->client_port, key, value);
- }
+ else {
+ if(!strcmp(name, "NETDATA_PROTOCOL_VERSION"))
+ stream_flags = LABEL_FLAG_UPDATE_STREAM;
+ else
+ if (unlikely(rrdhost_set_system_info_variable(system_info, name, value))) {
+ info("STREAM [receive from [%s]:%s]: request has parameter '%s' = '%s', which is not used.",
+ w->client_ip, w->client_port, key, value);
+ }
+ }
}
if(!key || !*key) {
@@ -1444,6 +1513,7 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url
rpt->client_port = strdupz(w->client_port);
rpt->update_every = update_every;
rpt->system_info = system_info;
+ rpt->stream_flags = stream_flags;
#ifdef ENABLE_HTTPS
rpt->ssl.conn = w->ssl.conn;
rpt->ssl.flags = w->ssl.flags;
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index 7b1acd9e11..6d290a835b 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -17,6 +17,7 @@ extern int configured_as_master();
extern void rrdset_done_push(RRDSET *st);
extern void rrdset_push_chart_definition_now(RRDSET *st);
extern void *rrdpush_sender_thread(void *ptr);
+extern void rrdpush_send_labels(RRDHOST *host);
extern int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url);
extern void rrdpush_sender_thread_stop(RRDHOST *host);
diff --git a/streaming/stream.conf b/streaming/stream.conf
index fdff1f25fc..58dbcd8e68 100644
--- a/streaming/stream.conf
+++ b/streaming/stream.conf
@@ -87,7 +87,6 @@
# Sync the clock of the charts for that many iterations, when starting.
initial clock resync iterations = 60
-
# -----------------------------------------------------------------------------
# 2. ON MASTER NETDATA - THE ONE THAT WILL BE RECEIVING METRICS