summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorthiagoftsm <thiagoftsm@gmail.com>2020-02-05 20:13:09 +0000
committerGitHub <noreply@github.com>2020-02-05 20:13:08 +0000
commitd9f977b864a8abc2213201817a93a041c30d17b4 (patch)
treeb998fefd6408fb658001ffa4f521beac02fd04df /streaming
parent694b960620f5ea164c9a8dd2a37cd680f9a48fac (diff)
Stream with version (#7851)
* stream_forward: Fix protocol This commit brings the necessary fixes to the protocol * stream_forward: Fix old slave support This commit fixes the communication with old versions of Netdata * stream_forward: Remove declaration There was a wrong declaration inside a block, so I am removing it * stream_forward: USe version This commit brings the use of version instead flags to stream * stream_forward: Remove variable This commit removes useless variable from hand shake * stream_forward: Change message Change the message setting the protocol version on it * stream_forward: Fix version number * stream_forward: readable definition The definition and the variables were using the same data type, but with different declaration, this commit fixes this. * stream_forward: Set master version inside message This commit updates the message used that there was a successfull connection with master * stream_forward: FIx wrong version This commit fixes the multiple set for stream version * stream_forward: Reorganize code This commit reorganizes code to speed up the processing * stream_forward: Adjust code This commit removes an unecessary else * stream_forward: Brings old structure This commits returns a previous necessary to the code * stream_forward: fix error report This commit fixes the error report that was happening when the stream version does not match * stream_forward: Fixes msg and remove unecessary call
Diffstat (limited to 'streaming')
-rw-r--r--streaming/rrdpush.c126
-rw-r--r--streaming/rrdpush.h2
2 files changed, 83 insertions, 45 deletions
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index 59ae0b1537..701fe39953 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -28,6 +28,7 @@
#define STREAMING_PROTOCOL_VERSION "1.1"
#define START_STREAMING_PROMPT "Hit me baby, push them over..."
#define START_STREAMING_PROMPT_V2 "Hit me baby, push them over and bring the host labels..."
+#define START_STREAMING_PROMPT_VN "Hit me baby, push them over with the version="
typedef enum {
RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW,
@@ -496,6 +497,11 @@ static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) {
}
}
+static inline void rrdpush_set_flags_to_newest_stream(RRDHOST *host) {
+ host->labels_flag |= LABEL_FLAG_UPDATE_STREAM;
+ host->labels_flag &= ~LABEL_FLAG_STOP_STREAM;
+}
+
//called from client side
static int rrdpush_sender_thread_connect_to_master(RRDHOST *host, int default_port, int timeout, size_t *reconnects_counter, char *connected_to, size_t connected_to_size) {
struct timeval tv = {
@@ -560,7 +566,7 @@ static int rrdpush_sender_thread_connect_to_master(RRDHOST *host, int default_po
#define HTTP_HEADER_SIZE 8192
char http[HTTP_HEADER_SIZE + 1];
int eol = snprintfz(http, HTTP_HEADER_SIZE,
- "STREAM key=%s&hostname=%s&registry_hostname=%s&machine_guid=%s&update_every=%d&os=%s&timezone=%s&tags=%s"
+ "STREAM key=%s&hostname=%s&registry_hostname=%s&machine_guid=%s&update_every=%d&os=%s&timezone=%s&tags=%s&ver=%u"
"&NETDATA_SYSTEM_OS_NAME=%s"
"&NETDATA_SYSTEM_OS_ID=%s"
"&NETDATA_SYSTEM_OS_ID_LIKE=%s"
@@ -586,6 +592,7 @@ static int rrdpush_sender_thread_connect_to_master(RRDHOST *host, int default_po
, host->os
, host->timezone
, (host->tags) ? host->tags : ""
+ , STREAMING_PROTOCOL_CURRENT_VERSION
, (host->system_info->host_os_name) ? host->system_info->host_os_name : ""
, (host->system_info->host_os_id) ? host->system_info->host_os_id : ""
, (host->system_info->host_os_id_like) ? host->system_info->host_os_id_like : ""
@@ -643,26 +650,44 @@ static int rrdpush_sender_thread_connect_to_master(RRDHOST *host, int default_po
info("STREAM %s [send to %s]: waiting response from remote netdata...", host->hostname, connected_to);
+ ssize_t received;
#ifdef ENABLE_HTTPS
- if(recv_timeout(&host->ssl,host->rrdpush_sender_socket, http, HTTP_HEADER_SIZE, 0, timeout) == -1) {
+ received = recv_timeout(&host->ssl,host->rrdpush_sender_socket, http, HTTP_HEADER_SIZE, 0, timeout);
+ if(received == -1) {
#else
- if(recv_timeout(host->rrdpush_sender_socket, http, HTTP_HEADER_SIZE, 0, timeout) == -1) {
+ received = recv_timeout(host->rrdpush_sender_socket, http, HTTP_HEADER_SIZE, 0, timeout);
+ if(received == -1) {
#endif
error("STREAM %s [send to %s]: remote netdata does not respond.", host->hostname, connected_to);
rrdpush_sender_thread_close_socket(host);
return 0;
}
- int answer = strncmp(http, START_STREAMING_PROMPT_V2, strlen(START_STREAMING_PROMPT_V2));
- if(!answer) {
- host->labels_flag |= LABEL_FLAG_UPDATE_STREAM;
- host->labels_flag &= ~LABEL_FLAG_STOP_STREAM;
+ http[received] = '\0';
+ int answer = -1;
+ char *version_start = strchr(http, '=');
+ uint32_t version;
+ if(version_start) {
+ version_start++;
+ version = (uint32_t)strtol(version_start, NULL, 10);
+ answer = memcmp(http, START_STREAMING_PROMPT_VN, (size_t)(version_start - http));
+ if(!answer) {
+ rrdpush_set_flags_to_newest_stream(host);
+ host->stream_version = version;
+ }
} else {
- answer = strncmp(http, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT));
+ answer = memcmp(http, START_STREAMING_PROMPT_V2, strlen(START_STREAMING_PROMPT_V2));
if(!answer) {
- host->labels_flag |= LABEL_FLAG_STOP_STREAM;
- host->labels_flag &= ~LABEL_FLAG_UPDATE_STREAM;
- info("STREAM %s [send to %s]: is using an old Netdata.", host->hostname, connected_to);
+ version = 1;
+ rrdpush_set_flags_to_newest_stream(host);
+ }
+ else {
+ answer = memcmp(http, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT));
+ if(!answer) {
+ version = 0;
+ host->labels_flag |= LABEL_FLAG_STOP_STREAM;
+ host->labels_flag &= ~LABEL_FLAG_UPDATE_STREAM;
+ }
}
}
@@ -672,7 +697,10 @@ static int rrdpush_sender_thread_connect_to_master(RRDHOST *host, int default_po
return 0;
}
- info("STREAM %s [send to %s]: established communication - ready to send metrics...", host->hostname, connected_to);
+ info("STREAM %s [send to %s]: established communication with a master using protocol version %u - ready to send metrics..."
+ , host->hostname
+ , connected_to
+ , version);
if(sock_setnonblock(host->rrdpush_sender_socket) < 0)
error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", host->hostname, connected_to);
@@ -1036,7 +1064,7 @@ static int rrdpush_receive(int fd
, int update_every
, char *client_ip
, char *client_port
- , int stream_flags
+ , uint32_t stream_version
#ifdef ENABLE_HTTPS
, struct netdata_ssl *ssl
#endif
@@ -1153,15 +1181,18 @@ static int rrdpush_receive(int fd
snprintfz(cd.cmd, PLUGINSD_CMD_MAX, "%s:%s", client_ip, client_port);
info("STREAM %s [receive from [%s]:%s]: initializing communication...", host->hostname, client_ip, client_port);
- char *initial_response;
- if (stream_flags & LABEL_FLAG_UPDATE_STREAM) {
- info("STREAM %s [receive from [%s]:%s]: Netdata is using the newest stream protocol.", host->hostname, client_ip, client_port);
- initial_response = START_STREAMING_PROMPT_V2;
+ char initial_response[HTTP_HEADER_SIZE];
+ if (stream_version > 1) {
+ info("STREAM %s [receive from [%s]:%s]: Netdata is using the stream version %u.", host->hostname, client_ip, client_port, stream_version);
+ sprintf(initial_response, "%s%u", START_STREAMING_PROMPT_VN, stream_version);
+ } else if (stream_version == 1) {
+ info("STREAM %s [receive from [%s]:%s]: Netdata is using the stream version %u.", host->hostname, client_ip, client_port, stream_version);
+ sprintf(initial_response, "%s", START_STREAMING_PROMPT_V2);
} else {
- info("STREAM %s [receive from [%s]:%s]: Netdata is using an old protocol.", host->hostname, client_ip, client_port);
- initial_response = START_STREAMING_PROMPT;
+ info("STREAM %s [receive from [%s]:%s]: Netdata is using first stream protocol.", host->hostname, client_ip, client_port);
+ sprintf(initial_response, "%s", START_STREAMING_PROMPT);
}
-#ifdef ENABLE_HTTPS
+ #ifdef ENABLE_HTTPS
host->stream_ssl.conn = ssl->conn;
host->stream_ssl.flags = ssl->flags;
if(send_timeout(ssl, fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) {
@@ -1206,7 +1237,7 @@ static int rrdpush_receive(int fd
rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN);
host->connected_senders++;
host->senders_disconnected_time = 0;
- host->labels_flag = stream_flags;
+ host->labels_flag = (stream_version > 0)?LABEL_FLAG_UPDATE_STREAM:LABEL_FLAG_STOP_STREAM;
if(health_enabled != CONFIG_BOOLEAN_NO) {
if(alarms_delay > 0) {
@@ -1262,7 +1293,7 @@ struct rrdpush_thread {
char *program_version;
struct rrdhost_system_info *system_info;
int update_every;
- int stream_flags;
+ uint32_t stream_version;
#ifdef ENABLE_HTTPS
struct netdata_ssl ssl;
#endif
@@ -1318,7 +1349,7 @@ static void *rrdpush_receiver_thread(void *ptr) {
, rpt->update_every
, rpt->client_ip
, rpt->client_port
- , rpt->stream_flags
+ , rpt->stream_version
#ifdef ENABLE_HTTPS
, &rpt->ssl
#endif
@@ -1367,8 +1398,8 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url
char *key = NULL, *hostname = NULL, *registry_hostname = NULL, *machine_guid = NULL, *os = "unknown", *timezone = "unknown", *tags = NULL;
int update_every = default_rrd_update_every;
+ uint32_t stream_version = UINT_MAX;
char buf[GUID_LEN + 1];
- int stream_flags = LABEL_FLAG_STOP_STREAM;
struct rrdhost_system_info *system_info = callocz(1, sizeof(struct rrdhost_system_info));
@@ -1396,31 +1427,36 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url
timezone = value;
else if(!strcmp(name, "tags"))
tags = value;
+ else if(!strcmp(name, "ver"))
+ stream_version = MIN((uint32_t) strtoul(value, NULL, 0), STREAMING_PROTOCOL_CURRENT_VERSION);
else {
- if(!strcmp(name, "NETDATA_PROTOCOL_VERSION"))
- stream_flags = LABEL_FLAG_UPDATE_STREAM;
- else {
- // An old Netdata slave does not have a compatible streaming protocol, map to something sane.
- if (!strcmp(name, "NETDATA_SYSTEM_OS_NAME"))
- name = "NETDATA_HOST_OS_NAME";
- else if (!strcmp(name, "NETDATA_SYSTEM_OS_ID"))
- name = "NETDATA_HOST_OS_ID";
- else if (!strcmp(name, "NETDATA_SYSTEM_OS_ID_LIKE"))
- name = "NETDATA_HOST_OS_ID_LIKE";
- else if (!strcmp(name, "NETDATA_SYSTEM_OS_VERSION"))
- name = "NETDATA_HOST_OS_VERSION";
- else if (!strcmp(name, "NETDATA_SYSTEM_OS_VERSION_ID"))
- name = "NETDATA_HOST_OS_VERSION_ID";
- else if (!strcmp(name, "NETDATA_SYSTEM_OS_DETECTION"))
- name = "NETDATA_HOST_OS_DETECTION";
- if (unlikely(rrdhost_set_system_info_variable(system_info, name, value))) {
- info("STREAM [receive from [%s]:%s]: request has parameter '%s' = '%s', which is not used.",
- w->client_ip, w->client_port, key, value);
- }
+ // An old Netdata slave does not have a compatible streaming protocol, map to something sane.
+ if (!strcmp(name, "NETDATA_SYSTEM_OS_NAME"))
+ name = "NETDATA_HOST_OS_NAME";
+ else if (!strcmp(name, "NETDATA_SYSTEM_OS_ID"))
+ name = "NETDATA_HOST_OS_ID";
+ else if (!strcmp(name, "NETDATA_SYSTEM_OS_ID_LIKE"))
+ name = "NETDATA_HOST_OS_ID_LIKE";
+ else if (!strcmp(name, "NETDATA_SYSTEM_OS_VERSION"))
+ name = "NETDATA_HOST_OS_VERSION";
+ else if (!strcmp(name, "NETDATA_SYSTEM_OS_VERSION_ID"))
+ name = "NETDATA_HOST_OS_VERSION_ID";
+ else if (!strcmp(name, "NETDATA_SYSTEM_OS_DETECTION"))
+ name = "NETDATA_HOST_OS_DETECTION";
+ else if(!strcmp(name, "NETDATA_PROTOCOL_VERSION") && stream_version == UINT_MAX) {
+ stream_version = 1;
+ }
+
+ if (unlikely(rrdhost_set_system_info_variable(system_info, name, value))) {
+ info("STREAM [receive from [%s]:%s]: request has parameter '%s' = '%s', which is not used.",
+ w->client_ip, w->client_port, name, value);
}
}
}
+ if (stream_version == UINT_MAX)
+ stream_version = 0;
+
if(!key || !*key) {
rrdhost_system_info_free(system_info);
log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - NO KEY");
@@ -1532,7 +1568,7 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url
rpt->client_port = strdupz(w->client_port);
rpt->update_every = update_every;
rpt->system_info = system_info;
- rpt->stream_flags = stream_flags;
+ rpt->stream_version = stream_version;
#ifdef ENABLE_HTTPS
rpt->ssl.conn = w->ssl.conn;
rpt->ssl.flags = w->ssl.flags;
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index 6d290a835b..214c7c6fde 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -6,6 +6,8 @@
#include "web/server/web_client.h"
#include "daemon/common.h"
+#define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)2
+
extern unsigned int default_rrdpush_enabled;
extern char *default_rrdpush_destination;
extern char *default_rrdpush_api_key;