summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorthiagoftsm <thiagoftsm@gmail.com>2020-01-14 10:27:22 +0000
committerTimo <timotej@netdata.cloud>2020-01-14 11:27:22 +0100
commitef2b11fcb4d56ec946f6dc24929ba6ec0b54d0f2 (patch)
tree5ac5e89b35404bdabc08162505f9ad02c86df7c3 /streaming
parentbb51e824f97cd135674e2940bdbd5458fbfba15f (diff)
Stream with labels (#7549)
This commit enables streaming host labels
Diffstat (limited to 'streaming')
-rw-r--r--streaming/rrdpush.c84
-rw-r--r--streaming/rrdpush.h1
-rw-r--r--streaming/stream.conf1
3 files changed, 78 insertions, 8 deletions
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index c88ee717ad..c6c391e4b1 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -25,7 +25,9 @@
*
*/
+#define STREAMING_PROTOCOL_VERSION "1.1"
#define START_STREAMING_PROMPT "Hit me baby, push them over..."
+#define START_STREAMING_PROMPT_V2 "Hit me baby, push them over and bring the host labels..."
typedef enum {
RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW,
@@ -79,6 +81,7 @@ int rrdpush_init() {
default_rrdpush_send_charts_matching = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "send charts matching", "*");
rrdhost_free_orphan_time = config_get_number(CONFIG_SECTION_GLOBAL, "cleanup orphan hosts after seconds", rrdhost_free_orphan_time);
+
if(default_rrdpush_enabled && (!default_rrdpush_destination || !*default_rrdpush_destination || !default_rrdpush_api_key || !*default_rrdpush_api_key)) {
error("STREAM [send]: cannot enable sending thread - information is missing.");
default_rrdpush_enabled = 0;
@@ -338,6 +341,36 @@ void rrdset_done_push(RRDSET *st) {
rrdpush_buffer_unlock(host);
}
+// labels
+void rrdpush_send_labels(RRDHOST *host) {
+ if (!host->labels || !(host->labels_flag & LABEL_FLAG_UPDATE_STREAM) || (host->labels_flag & LABEL_FLAG_STOP_STREAM))
+ return;
+
+ rrdpush_buffer_lock(host);
+ netdata_rwlock_rdlock(&host->labels_rwlock);
+
+ struct label *labels = host->labels;
+ while(labels) {
+ buffer_sprintf(host->rrdpush_sender_buffer
+ , "LABEL \"%s\" = %d %s\n"
+ , labels->key
+ , (int)labels->label_source
+ , labels->value);
+
+ labels = labels->next;
+ }
+
+ buffer_sprintf(host->rrdpush_sender_buffer
+ , "OVERWRITE %s\n", "labels");
+
+ netdata_rwlock_unlock(&host->labels_rwlock);
+
+ if(host->rrdpush_sender_pipe[PIPE_WRITE] != -1 && write(host->rrdpush_sender_pipe[PIPE_WRITE], " ", 1) == -1)
+ error("STREAM %s [send]: cannot write to internal pipe", host->hostname);
+
+ rrdpush_buffer_unlock(host);
+ host->labels_flag &= ~LABEL_FLAG_UPDATE_STREAM;
+}
// ----------------------------------------------------------------------------
// rrdpush sender thread
@@ -536,6 +569,7 @@ static int rrdpush_sender_thread_connect_to_master(RRDHOST *host, int default_po
"&NETDATA_SYSTEM_VIRT_DETECTION=%s"
"&NETDATA_SYSTEM_CONTAINER=%s"
"&NETDATA_SYSTEM_CONTAINER_DETECTION=%s"
+ "&NETDATA_PROTOCOL_VERSION=%s"
" HTTP/1.1\r\n"
"User-Agent: %s/%s\r\n"
"Accept: */*\r\n\r\n"
@@ -560,6 +594,7 @@ static int rrdpush_sender_thread_connect_to_master(RRDHOST *host, int default_po
, (host->system_info->virt_detection) ? host->system_info->virt_detection : ""
, (host->system_info->container) ? host->system_info->container : ""
, (host->system_info->container_detection) ? host->system_info->container_detection : ""
+ , STREAMING_PROTOCOL_VERSION
, host->program_name
, host->program_version
);
@@ -613,7 +648,20 @@ static int rrdpush_sender_thread_connect_to_master(RRDHOST *host, int default_po
return 0;
}
- if(strncmp(http, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT)) != 0) {
+ int answer = strncmp(http, START_STREAMING_PROMPT_V2, strlen(START_STREAMING_PROMPT_V2));
+ if(!answer) {
+ host->labels_flag |= LABEL_FLAG_UPDATE_STREAM;
+ host->labels_flag &= ~LABEL_FLAG_STOP_STREAM;
+ } else {
+ answer = strncmp(http, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT));
+ if(!answer) {
+ host->labels_flag |= LABEL_FLAG_STOP_STREAM;
+ host->labels_flag &= ~LABEL_FLAG_UPDATE_STREAM;
+ info("STREAM %s [send to %s]: is using an old Netdata.", host->hostname, connected_to);
+ }
+ }
+
+ if(answer != 0) {
error("STREAM %s [send to %s]: server is not replying properly (is it a netdata?).", host->hostname, connected_to);
rrdpush_sender_thread_close_socket(host);
return 0;
@@ -814,6 +862,8 @@ void *rrdpush_sender_thread(void *ptr) {
}
if (ofd->revents & POLLOUT) {
+ rrdpush_send_labels(host);
+
if (begin < buffer_strlen(host->rrdpush_sender_buffer)) {
debug(D_STREAM, "STREAM: Sending data (current buffer length %zu bytes, begin = %zu)...", buffer_strlen(host->rrdpush_sender_buffer), begin);
@@ -981,6 +1031,7 @@ static int rrdpush_receive(int fd
, int update_every
, char *client_ip
, char *client_port
+ , int stream_flags
#ifdef ENABLE_HTTPS
, struct netdata_ssl *ssl
#endif
@@ -1097,12 +1148,20 @@ static int rrdpush_receive(int fd
snprintfz(cd.cmd, PLUGINSD_CMD_MAX, "%s:%s", client_ip, client_port);
info("STREAM %s [receive from [%s]:%s]: initializing communication...", host->hostname, client_ip, client_port);
+ char *initial_response;
+ if (stream_flags & LABEL_FLAG_UPDATE_STREAM) {
+ info("STREAM %s [receive from [%s]:%s]: Netdata is using the newest stream protocol.", host->hostname, client_ip, client_port);
+ initial_response = START_STREAMING_PROMPT_V2;
+ } else {
+ info("STREAM %s [receive from [%s]:%s]: Netdata is using an old protocol.", host->hostname, client_ip, client_port);
+ initial_response = START_STREAMING_PROMPT;
+ }
#ifdef ENABLE_HTTPS
host->stream_ssl.conn = ssl->conn;
host->stream_ssl.flags = ssl->flags;
- if(send_timeout(ssl,fd, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT), 0, 60) != strlen(START_STREAMING_PROMPT)) {
+ if(send_timeout(ssl, fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) {
#else
- if(send_timeout(fd, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT), 0, 60) != strlen(START_STREAMING_PROMPT)) {
+ if(send_timeout(fd, initial_response, strlen(initial_response), 0, 60) != strlen(initial_response)) {
#endif
log_stream_connection(client_ip, client_port, key, host->machine_guid, host->hostname, "FAILED - CANNOT REPLY");
error("STREAM %s [receive from [%s]:%s]: cannot send ready command.", host->hostname, client_ip, client_port);
@@ -1142,6 +1201,8 @@ static int rrdpush_receive(int fd
rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN);
host->connected_senders++;
host->senders_disconnected_time = 0;
+ host->labels_flag = stream_flags;
+
if(health_enabled != CONFIG_BOOLEAN_NO) {
if(alarms_delay > 0) {
host->health_delay_up_to = now_realtime_sec() + alarms_delay;
@@ -1196,6 +1257,7 @@ struct rrdpush_thread {
char *program_version;
struct rrdhost_system_info *system_info;
int update_every;
+ int stream_flags;
#ifdef ENABLE_HTTPS
struct netdata_ssl ssl;
#endif
@@ -1251,6 +1313,7 @@ static void *rrdpush_receiver_thread(void *ptr) {
, rpt->update_every
, rpt->client_ip
, rpt->client_port
+ , rpt->stream_flags
#ifdef ENABLE_HTTPS
, &rpt->ssl
#endif
@@ -1300,6 +1363,7 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url
char *key = NULL, *hostname = NULL, *registry_hostname = NULL, *machine_guid = NULL, *os = "unknown", *timezone = "unknown", *tags = NULL;
int update_every = default_rrd_update_every;
char buf[GUID_LEN + 1];
+ int stream_flags = LABEL_FLAG_STOP_STREAM;
struct rrdhost_system_info *system_info = callocz(1, sizeof(struct rrdhost_system_info));
@@ -1327,10 +1391,15 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url
timezone = value;
else if(!strcmp(name, "tags"))
tags = value;
- else
- if(unlikely(rrdhost_set_system_info_variable(system_info, name, value))) {
- info("STREAM [receive from [%s]:%s]: request has parameter '%s' = '%s', which is not used.", w->client_ip, w->client_port, key, value);
- }
+ else {
+ if(!strcmp(name, "NETDATA_PROTOCOL_VERSION"))
+ stream_flags = LABEL_FLAG_UPDATE_STREAM;
+ else
+ if (unlikely(rrdhost_set_system_info_variable(system_info, name, value))) {
+ info("STREAM [receive from [%s]:%s]: request has parameter '%s' = '%s', which is not used.",
+ w->client_ip, w->client_port, key, value);
+ }
+ }
}
if(!key || !*key) {
@@ -1444,6 +1513,7 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url
rpt->client_port = strdupz(w->client_port);
rpt->update_every = update_every;
rpt->system_info = system_info;
+ rpt->stream_flags = stream_flags;
#ifdef ENABLE_HTTPS
rpt->ssl.conn = w->ssl.conn;
rpt->ssl.flags = w->ssl.flags;
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index 7b1acd9e11..6d290a835b 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -17,6 +17,7 @@ extern int configured_as_master();
extern void rrdset_done_push(RRDSET *st);
extern void rrdset_push_chart_definition_now(RRDSET *st);
extern void *rrdpush_sender_thread(void *ptr);
+extern void rrdpush_send_labels(RRDHOST *host);
extern int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url);
extern void rrdpush_sender_thread_stop(RRDHOST *host);
diff --git a/streaming/stream.conf b/streaming/stream.conf
index fdff1f25fc..58dbcd8e68 100644
--- a/streaming/stream.conf
+++ b/streaming/stream.conf
@@ -87,7 +87,6 @@
# Sync the clock of the charts for that many iterations, when starting.
initial clock resync iterations = 60
-
# -----------------------------------------------------------------------------
# 2. ON MASTER NETDATA - THE ONE THAT WILL BE RECEIVING METRICS