diff options
author | thiagoftsm <thiagoftsm@gmail.com> | 2020-02-05 20:13:09 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-05 20:13:08 +0000 |
commit | d9f977b864a8abc2213201817a93a041c30d17b4 (patch) | |
tree | b998fefd6408fb658001ffa4f521beac02fd04df /streaming | |
parent | 694b960620f5ea164c9a8dd2a37cd680f9a48fac (diff) |
Stream with version (#7851)
* stream_forward: Fix protocol
This commit brings the necessary fixes to the protocol
* stream_forward: Fix old slave support
This commit fixes the communication with old versions of Netdata
* stream_forward: Remove declaration
There was a wrong declaration inside a block, so I am removing it
* stream_forward: USe version
This commit brings the use of version instead flags to stream
* stream_forward: Remove variable
This commit removes useless variable from hand shake
* stream_forward: Change message
Change the message setting the protocol version on it
* stream_forward: Fix version number
* stream_forward: readable definition
The definition and the variables were using the same data type, but with different declaration,
this commit fixes this.
* stream_forward: Set master version inside message
This commit updates the message used that there was a successfull connection with master
* stream_forward: FIx wrong version
This commit fixes the multiple set for stream version
* stream_forward: Reorganize code
This commit reorganizes code to speed up the processing
* stream_forward: Adjust code
This commit removes an unecessary else
* stream_forward: Brings old structure
This commits returns a previous necessary to the code
* stream_forward: fix error report
This commit fixes the error report that was happening when the stream version does not match
* stream_forward: Fixes msg and remove unecessary call
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/rrdpush.c | 126 | ||||
-rw-r--r-- | streaming/rrdpush.h | 2 |
2 files changed, 83 insertions, 45 deletions
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index 59ae0b1537..701fe39953 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -28,6 +28,7 @@ #define STREAMING_PROTOCOL_VERSION "1.1" #define START_STREAMING_PROMPT "Hit me baby, push them over..." #define START_STREAMING_PROMPT_V2 "Hit me baby, push them over and bring the host labels..." +#define START_STREAMING_PROMPT_VN "Hit me baby, push them over with the version=" typedef enum { RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW, @@ -496,6 +497,11 @@ static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) { } } +static inline void rrdpush_set_flags_to_newest_stream(RRDHOST *host) { + host->labels_flag |= LABEL_FLAG_UPDATE_STREAM; + host->labels_flag &= ~LABEL_FLAG_STOP_STREAM; +} + //called from client side static int rrdpush_sender_thread_connect_to_master(RRDHOST *host, int default_port, int timeout, size_t *reconnects_counter, char *connected_to, size_t connected_to_size) { struct timeval tv = { @@ -560,7 +566,7 @@ static int rrdpush_sender_thread_connect_to_master(RRDHOST *host, int default_po #define HTTP_HEADER_SIZE 8192 char http[HTTP_HEADER_SIZE + 1]; int eol = snprintfz(http, HTTP_HEADER_SIZE, - "STREAM key=%s&hostname=%s®istry_hostname=%s&machine_guid=%s&update_every=%d&os=%s&timezone=%s&tags=%s" + "STREAM key=%s&hostname=%s®istry_hostname=%s&machine_guid=%s&update_every=%d&os=%s&timezone=%s&tags=%s&ver=%u" "&NETDATA_SYSTEM_OS_NAME=%s" "&NETDATA_SYSTEM_OS_ID=%s" "&NETDATA_SYSTEM_OS_ID_LIKE=%s" @@ -586,6 +592,7 @@ static int rrdpush_sender_thread_connect_to_master(RRDHOST *host, int default_po , host->os , host->timezone , (host->tags) ? host->tags : "" + , STREAMING_PROTOCOL_CURRENT_VERSION , (host->system_info->host_os_name) ? host->system_info->host_os_name : "" , (host->system_info->host_os_id) ? host->system_info->host_os_id : "" , (host->system_info->host_os_id_like) ? host->system_info->host_os_id_like : "" @@ -643,26 +650,44 @@ static int rrdpush_sender_thread_connect_to_master(RRDHOST *host, int default_po info("STREAM %s [send to %s]: waiting response from remote netdata...", host->hostname, connected_to); + ssize_t received; #ifdef ENABLE_HTTPS - if(recv_timeout(&host->ssl,host->rrdpush_sender_socket, http, HTTP_HEADER_SIZE, 0, timeout) == -1) { + received = recv_timeout(&host->ssl,host->rrdpush_sender_socket, http, HTTP_HEADER_SIZE, 0, timeout); + if(received == -1) { #else - if(recv_timeout(host->rrdpush_sender_socket, http, HTTP_HEADER_SIZE, 0, timeout) == -1) { + received = recv_timeout(host->rrdpush_sender_socket, http, HTTP_HEADER_SIZE, 0, timeout); + if(received == -1) { #endif error("STREAM %s [send to %s]: remote netdata does not respond.", host->hostname, connected_to); rrdpush_sender_thread_close_socket(host); return 0; } - int answer = strncmp(http, START_STREAMING_PROMPT_V2, strlen(START_STREAMING_PROMPT_V2)); - if(!answer) { - host->labels_flag |= LABEL_FLAG_UPDATE_STREAM; - host->labels_flag &= ~LABEL_FLAG_STOP_STREAM; + http[received] = '\0'; + int answer = -1; + char *version_start = strchr(http, '='); + uint32_t version; + if(version_start) { + version_start++; + version = (uint32_t)strtol(version_start, NULL, 10); + answer = memcmp(http, START_STREAMING_PROMPT_VN, (size_t)(version_start - http)); + if(!answer) { + rrdpush_set_flags_to_newest_stream(host); + host->stream_version = version; + } } else { - answer = strncmp(http, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT)); + answer = memcmp(http, START_STREAMING_PROMPT_V2, strlen(START_STREAMING_PROMPT_V2)); if(!answer) { - host->labels_flag |= LABEL_FLAG_STOP_STREAM; - host->labels_flag &= ~LABEL_FLAG_UPDATE_STREAM; - info("STREAM %s [send to %s]: is using an old Netdata.", host->hostname, connected_to); + version = 1; + rrdpush_set_flags_to_newest_stream(host); + } + else { + answer = memcmp(http, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT)); + if(!answer) { + version = 0; + host->labels_flag |= LABEL_FLAG_STOP_STREAM; + host->labels_flag &= ~LABEL_FLAG_UPDATE_STREAM; + } } } @@ -672,7 +697,10 @@ static int rrdpush_sender_thread_connect_to_master(RRDHOST *host, int default_po return 0; } - info("STREAM %s [send to %s]: established communication - ready to send metrics...", host->hostname, connected_to); + info("STREAM %s [send to %s]: established communication with a master using protocol version %u - ready to send metrics..." + , host->hostname + , connected_to + , version); if(sock_setnonblock(host->rrdpush_sender_socket) < 0) error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", host->hostname, connected_to); @@ -1036,7 +1064,7 @@ static int rrdpush_receive(int fd , int update_every , char *client_ip , char *client_port - , int stream_flags + , uint32_t stream_version #ifdef ENABLE_HTTPS , struct netdata_ssl *ssl #endif @@ -1153,15 +1181,18 @@ static int rrdpush_receive(int fd snprintfz(cd.cmd, PLUGINSD_CMD_MAX, "%s:%s", client_ip, client_port); info("STREAM %s [receive from [%s]:%s]: initializing communication...", host->hostname, client_ip, client_port); - char *initial_response; - if (stream_flags & LABEL_FLAG_UPDATE_STREAM) { - info("STREAM %s [receive from [%s]:%s]: Netdata is using the newest stream protocol.", host->hostname, client_ip, client_port); - initial_response = START_STREAMING_PROMPT_V2; + char initial_response[HTTP_HEADER_SIZE]; + if (stream_version > 1) { + info("STREAM %s [receive from [%s]:%s]: Netdata is using the stream version %u.", host->hostname, client_ip, client_port, stream_version); + sprintf(initial_response, "%s%u", START_STREAMING_PROMPT_VN, stream_version); + } else if (stream_version == 1) { + info("STREAM %s [receive from [%s]:%s]: Netdata is using the stream version %u.", host->hostname, client_ip, client_port, stream_version); + sprintf(initial_response, "%s", START_STREAMING_PROMPT_V2); } else { - info("STREAM %s [receive from [%s]:%s]: Netdata is using an old protocol.", host->hostname, client_ip, client_port); - initial_response = START_STREAMING_PROMPT; + info("STREAM %s [receive from [%s]:%s]: Netdata is using first stream protocol.", host->hostname, client_ip, client_port); + sprintf(initial_response, "%s", START_STREAMING_PROMPT); } -#ifdef ENABLE_HTTPS + #ifdef ENABLE_HTTPS host->stream_ssl.conn = ssl->conn; host->stream_ssl.flags = ssl->flags; if(send_timeout(ssl, fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) { @@ -1206,7 +1237,7 @@ static int rrdpush_receive(int fd rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN); host->connected_senders++; host->senders_disconnected_time = 0; - host->labels_flag = stream_flags; + host->labels_flag = (stream_version > 0)?LABEL_FLAG_UPDATE_STREAM:LABEL_FLAG_STOP_STREAM; if(health_enabled != CONFIG_BOOLEAN_NO) { if(alarms_delay > 0) { @@ -1262,7 +1293,7 @@ struct rrdpush_thread { char *program_version; struct rrdhost_system_info *system_info; int update_every; - int stream_flags; + uint32_t stream_version; #ifdef ENABLE_HTTPS struct netdata_ssl ssl; #endif @@ -1318,7 +1349,7 @@ static void *rrdpush_receiver_thread(void *ptr) { , rpt->update_every , rpt->client_ip , rpt->client_port - , rpt->stream_flags + , rpt->stream_version #ifdef ENABLE_HTTPS , &rpt->ssl #endif @@ -1367,8 +1398,8 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url char *key = NULL, *hostname = NULL, *registry_hostname = NULL, *machine_guid = NULL, *os = "unknown", *timezone = "unknown", *tags = NULL; int update_every = default_rrd_update_every; + uint32_t stream_version = UINT_MAX; char buf[GUID_LEN + 1]; - int stream_flags = LABEL_FLAG_STOP_STREAM; struct rrdhost_system_info *system_info = callocz(1, sizeof(struct rrdhost_system_info)); @@ -1396,31 +1427,36 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url timezone = value; else if(!strcmp(name, "tags")) tags = value; + else if(!strcmp(name, "ver")) + stream_version = MIN((uint32_t) strtoul(value, NULL, 0), STREAMING_PROTOCOL_CURRENT_VERSION); else { - if(!strcmp(name, "NETDATA_PROTOCOL_VERSION")) - stream_flags = LABEL_FLAG_UPDATE_STREAM; - else { - // An old Netdata slave does not have a compatible streaming protocol, map to something sane. - if (!strcmp(name, "NETDATA_SYSTEM_OS_NAME")) - name = "NETDATA_HOST_OS_NAME"; - else if (!strcmp(name, "NETDATA_SYSTEM_OS_ID")) - name = "NETDATA_HOST_OS_ID"; - else if (!strcmp(name, "NETDATA_SYSTEM_OS_ID_LIKE")) - name = "NETDATA_HOST_OS_ID_LIKE"; - else if (!strcmp(name, "NETDATA_SYSTEM_OS_VERSION")) - name = "NETDATA_HOST_OS_VERSION"; - else if (!strcmp(name, "NETDATA_SYSTEM_OS_VERSION_ID")) - name = "NETDATA_HOST_OS_VERSION_ID"; - else if (!strcmp(name, "NETDATA_SYSTEM_OS_DETECTION")) - name = "NETDATA_HOST_OS_DETECTION"; - if (unlikely(rrdhost_set_system_info_variable(system_info, name, value))) { - info("STREAM [receive from [%s]:%s]: request has parameter '%s' = '%s', which is not used.", - w->client_ip, w->client_port, key, value); - } + // An old Netdata slave does not have a compatible streaming protocol, map to something sane. + if (!strcmp(name, "NETDATA_SYSTEM_OS_NAME")) + name = "NETDATA_HOST_OS_NAME"; + else if (!strcmp(name, "NETDATA_SYSTEM_OS_ID")) + name = "NETDATA_HOST_OS_ID"; + else if (!strcmp(name, "NETDATA_SYSTEM_OS_ID_LIKE")) + name = "NETDATA_HOST_OS_ID_LIKE"; + else if (!strcmp(name, "NETDATA_SYSTEM_OS_VERSION")) + name = "NETDATA_HOST_OS_VERSION"; + else if (!strcmp(name, "NETDATA_SYSTEM_OS_VERSION_ID")) + name = "NETDATA_HOST_OS_VERSION_ID"; + else if (!strcmp(name, "NETDATA_SYSTEM_OS_DETECTION")) + name = "NETDATA_HOST_OS_DETECTION"; + else if(!strcmp(name, "NETDATA_PROTOCOL_VERSION") && stream_version == UINT_MAX) { + stream_version = 1; + } + + if (unlikely(rrdhost_set_system_info_variable(system_info, name, value))) { + info("STREAM [receive from [%s]:%s]: request has parameter '%s' = '%s', which is not used.", + w->client_ip, w->client_port, name, value); } } } + if (stream_version == UINT_MAX) + stream_version = 0; + if(!key || !*key) { rrdhost_system_info_free(system_info); log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - NO KEY"); @@ -1532,7 +1568,7 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url rpt->client_port = strdupz(w->client_port); rpt->update_every = update_every; rpt->system_info = system_info; - rpt->stream_flags = stream_flags; + rpt->stream_version = stream_version; #ifdef ENABLE_HTTPS rpt->ssl.conn = w->ssl.conn; rpt->ssl.flags = w->ssl.flags; diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h index 6d290a835b..214c7c6fde 100644 --- a/streaming/rrdpush.h +++ b/streaming/rrdpush.h @@ -6,6 +6,8 @@ #include "web/server/web_client.h" #include "daemon/common.h" +#define STREAMING_PROTOCOL_CURRENT_VERSION (uint32_t)2 + extern unsigned int default_rrdpush_enabled; extern char *default_rrdpush_destination; extern char *default_rrdpush_api_key; |