diff options
author | thiagoftsm <thiagoftsm@gmail.com> | 2020-01-14 10:27:22 +0000 |
---|---|---|
committer | Timo <timotej@netdata.cloud> | 2020-01-14 11:27:22 +0100 |
commit | ef2b11fcb4d56ec946f6dc24929ba6ec0b54d0f2 (patch) | |
tree | 5ac5e89b35404bdabc08162505f9ad02c86df7c3 /streaming | |
parent | bb51e824f97cd135674e2940bdbd5458fbfba15f (diff) |
Stream with labels (#7549)
This commit enables streaming host labels
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/rrdpush.c | 84 | ||||
-rw-r--r-- | streaming/rrdpush.h | 1 | ||||
-rw-r--r-- | streaming/stream.conf | 1 |
3 files changed, 78 insertions, 8 deletions
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index c88ee717ad..c6c391e4b1 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -25,7 +25,9 @@ * */ +#define STREAMING_PROTOCOL_VERSION "1.1" #define START_STREAMING_PROMPT "Hit me baby, push them over..." +#define START_STREAMING_PROMPT_V2 "Hit me baby, push them over and bring the host labels..." typedef enum { RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW, @@ -79,6 +81,7 @@ int rrdpush_init() { default_rrdpush_send_charts_matching = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "send charts matching", "*"); rrdhost_free_orphan_time = config_get_number(CONFIG_SECTION_GLOBAL, "cleanup orphan hosts after seconds", rrdhost_free_orphan_time); + if(default_rrdpush_enabled && (!default_rrdpush_destination || !*default_rrdpush_destination || !default_rrdpush_api_key || !*default_rrdpush_api_key)) { error("STREAM [send]: cannot enable sending thread - information is missing."); default_rrdpush_enabled = 0; @@ -338,6 +341,36 @@ void rrdset_done_push(RRDSET *st) { rrdpush_buffer_unlock(host); } +// labels +void rrdpush_send_labels(RRDHOST *host) { + if (!host->labels || !(host->labels_flag & LABEL_FLAG_UPDATE_STREAM) || (host->labels_flag & LABEL_FLAG_STOP_STREAM)) + return; + + rrdpush_buffer_lock(host); + netdata_rwlock_rdlock(&host->labels_rwlock); + + struct label *labels = host->labels; + while(labels) { + buffer_sprintf(host->rrdpush_sender_buffer + , "LABEL \"%s\" = %d %s\n" + , labels->key + , (int)labels->label_source + , labels->value); + + labels = labels->next; + } + + buffer_sprintf(host->rrdpush_sender_buffer + , "OVERWRITE %s\n", "labels"); + + netdata_rwlock_unlock(&host->labels_rwlock); + + if(host->rrdpush_sender_pipe[PIPE_WRITE] != -1 && write(host->rrdpush_sender_pipe[PIPE_WRITE], " ", 1) == -1) + error("STREAM %s [send]: cannot write to internal pipe", host->hostname); + + rrdpush_buffer_unlock(host); + host->labels_flag &= ~LABEL_FLAG_UPDATE_STREAM; +} // ---------------------------------------------------------------------------- // rrdpush sender thread @@ -536,6 +569,7 @@ static int rrdpush_sender_thread_connect_to_master(RRDHOST *host, int default_po "&NETDATA_SYSTEM_VIRT_DETECTION=%s" "&NETDATA_SYSTEM_CONTAINER=%s" "&NETDATA_SYSTEM_CONTAINER_DETECTION=%s" + "&NETDATA_PROTOCOL_VERSION=%s" " HTTP/1.1\r\n" "User-Agent: %s/%s\r\n" "Accept: */*\r\n\r\n" @@ -560,6 +594,7 @@ static int rrdpush_sender_thread_connect_to_master(RRDHOST *host, int default_po , (host->system_info->virt_detection) ? host->system_info->virt_detection : "" , (host->system_info->container) ? host->system_info->container : "" , (host->system_info->container_detection) ? host->system_info->container_detection : "" + , STREAMING_PROTOCOL_VERSION , host->program_name , host->program_version ); @@ -613,7 +648,20 @@ static int rrdpush_sender_thread_connect_to_master(RRDHOST *host, int default_po return 0; } - if(strncmp(http, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT)) != 0) { + int answer = strncmp(http, START_STREAMING_PROMPT_V2, strlen(START_STREAMING_PROMPT_V2)); + if(!answer) { + host->labels_flag |= LABEL_FLAG_UPDATE_STREAM; + host->labels_flag &= ~LABEL_FLAG_STOP_STREAM; + } else { + answer = strncmp(http, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT)); + if(!answer) { + host->labels_flag |= LABEL_FLAG_STOP_STREAM; + host->labels_flag &= ~LABEL_FLAG_UPDATE_STREAM; + info("STREAM %s [send to %s]: is using an old Netdata.", host->hostname, connected_to); + } + } + + if(answer != 0) { error("STREAM %s [send to %s]: server is not replying properly (is it a netdata?).", host->hostname, connected_to); rrdpush_sender_thread_close_socket(host); return 0; @@ -814,6 +862,8 @@ void *rrdpush_sender_thread(void *ptr) { } if (ofd->revents & POLLOUT) { + rrdpush_send_labels(host); + if (begin < buffer_strlen(host->rrdpush_sender_buffer)) { debug(D_STREAM, "STREAM: Sending data (current buffer length %zu bytes, begin = %zu)...", buffer_strlen(host->rrdpush_sender_buffer), begin); @@ -981,6 +1031,7 @@ static int rrdpush_receive(int fd , int update_every , char *client_ip , char *client_port + , int stream_flags #ifdef ENABLE_HTTPS , struct netdata_ssl *ssl #endif @@ -1097,12 +1148,20 @@ static int rrdpush_receive(int fd snprintfz(cd.cmd, PLUGINSD_CMD_MAX, "%s:%s", client_ip, client_port); info("STREAM %s [receive from [%s]:%s]: initializing communication...", host->hostname, client_ip, client_port); + char *initial_response; + if (stream_flags & LABEL_FLAG_UPDATE_STREAM) { + info("STREAM %s [receive from [%s]:%s]: Netdata is using the newest stream protocol.", host->hostname, client_ip, client_port); + initial_response = START_STREAMING_PROMPT_V2; + } else { + info("STREAM %s [receive from [%s]:%s]: Netdata is using an old protocol.", host->hostname, client_ip, client_port); + initial_response = START_STREAMING_PROMPT; + } #ifdef ENABLE_HTTPS host->stream_ssl.conn = ssl->conn; host->stream_ssl.flags = ssl->flags; - if(send_timeout(ssl,fd, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT), 0, 60) != strlen(START_STREAMING_PROMPT)) { + if(send_timeout(ssl, fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) { #else - if(send_timeout(fd, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT), 0, 60) != strlen(START_STREAMING_PROMPT)) { + if(send_timeout(fd, initial_response, strlen(initial_response), 0, 60) != strlen(initial_response)) { #endif log_stream_connection(client_ip, client_port, key, host->machine_guid, host->hostname, "FAILED - CANNOT REPLY"); error("STREAM %s [receive from [%s]:%s]: cannot send ready command.", host->hostname, client_ip, client_port); @@ -1142,6 +1201,8 @@ static int rrdpush_receive(int fd rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN); host->connected_senders++; host->senders_disconnected_time = 0; + host->labels_flag = stream_flags; + if(health_enabled != CONFIG_BOOLEAN_NO) { if(alarms_delay > 0) { host->health_delay_up_to = now_realtime_sec() + alarms_delay; @@ -1196,6 +1257,7 @@ struct rrdpush_thread { char *program_version; struct rrdhost_system_info *system_info; int update_every; + int stream_flags; #ifdef ENABLE_HTTPS struct netdata_ssl ssl; #endif @@ -1251,6 +1313,7 @@ static void *rrdpush_receiver_thread(void *ptr) { , rpt->update_every , rpt->client_ip , rpt->client_port + , rpt->stream_flags #ifdef ENABLE_HTTPS , &rpt->ssl #endif @@ -1300,6 +1363,7 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url char *key = NULL, *hostname = NULL, *registry_hostname = NULL, *machine_guid = NULL, *os = "unknown", *timezone = "unknown", *tags = NULL; int update_every = default_rrd_update_every; char buf[GUID_LEN + 1]; + int stream_flags = LABEL_FLAG_STOP_STREAM; struct rrdhost_system_info *system_info = callocz(1, sizeof(struct rrdhost_system_info)); @@ -1327,10 +1391,15 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url timezone = value; else if(!strcmp(name, "tags")) tags = value; - else - if(unlikely(rrdhost_set_system_info_variable(system_info, name, value))) { - info("STREAM [receive from [%s]:%s]: request has parameter '%s' = '%s', which is not used.", w->client_ip, w->client_port, key, value); - } + else { + if(!strcmp(name, "NETDATA_PROTOCOL_VERSION")) + stream_flags = LABEL_FLAG_UPDATE_STREAM; + else + if (unlikely(rrdhost_set_system_info_variable(system_info, name, value))) { + info("STREAM [receive from [%s]:%s]: request has parameter '%s' = '%s', which is not used.", + w->client_ip, w->client_port, key, value); + } + } } if(!key || !*key) { @@ -1444,6 +1513,7 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url rpt->client_port = strdupz(w->client_port); rpt->update_every = update_every; rpt->system_info = system_info; + rpt->stream_flags = stream_flags; #ifdef ENABLE_HTTPS rpt->ssl.conn = w->ssl.conn; rpt->ssl.flags = w->ssl.flags; diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h index 7b1acd9e11..6d290a835b 100644 --- a/streaming/rrdpush.h +++ b/streaming/rrdpush.h @@ -17,6 +17,7 @@ extern int configured_as_master(); extern void rrdset_done_push(RRDSET *st); extern void rrdset_push_chart_definition_now(RRDSET *st); extern void *rrdpush_sender_thread(void *ptr); +extern void rrdpush_send_labels(RRDHOST *host); extern int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url); extern void rrdpush_sender_thread_stop(RRDHOST *host); diff --git a/streaming/stream.conf b/streaming/stream.conf index fdff1f25fc..58dbcd8e68 100644 --- a/streaming/stream.conf +++ b/streaming/stream.conf @@ -87,7 +87,6 @@ # Sync the clock of the charts for that many iterations, when starting. initial clock resync iterations = 60 - # ----------------------------------------------------------------------------- # 2. ON MASTER NETDATA - THE ONE THAT WILL BE RECEIVING METRICS |