diff options
-rw-r--r-- | collectors/plugins.d/plugins_d.c | 40 | ||||
-rw-r--r-- | collectors/plugins.d/plugins_d.h | 2 | ||||
-rw-r--r-- | database/rrd.h | 6 | ||||
-rw-r--r-- | database/rrdhost.c | 33 | ||||
-rw-r--r-- | streaming/rrdpush.c | 84 | ||||
-rw-r--r-- | streaming/rrdpush.h | 1 | ||||
-rw-r--r-- | streaming/stream.conf | 1 |
7 files changed, 150 insertions, 17 deletions
diff --git a/collectors/plugins.d/plugins_d.c b/collectors/plugins.d/plugins_d.c index 32c5840328..7ac17587a9 100644 --- a/collectors/plugins.d/plugins_d.c +++ b/collectors/plugins.d/plugins_d.c @@ -234,9 +234,12 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int uint32_t DIMENSION_HASH = simple_hash(PLUGINSD_KEYWORD_DIMENSION); uint32_t DISABLE_HASH = simple_hash(PLUGINSD_KEYWORD_DISABLE); uint32_t VARIABLE_HASH = simple_hash(PLUGINSD_KEYWORD_VARIABLE); + uint32_t LABEL_HASH = simple_hash(PLUGINSD_KEYWORD_LABEL); + uint32_t OVERWRITE_HASH = simple_hash(PLUGINSD_KEYWORD_OVERWRITE); RRDSET *st = NULL; uint32_t hash; + struct label *new_labels = NULL; errno = 0; clearerr(fp); @@ -283,6 +286,7 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int #else r = fgets(line, PLUGINSD_LINE_MAX, fp); #endif + if(unlikely(!r)) { if(feof(fp)) error("read failed: end of file"); @@ -616,6 +620,39 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int enabled = 0; break; } + else if(likely(hash == LABEL_HASH && !strcmp(s, PLUGINSD_KEYWORD_LABEL))) { + debug(D_PLUGINSD, "requested a LABEL CHANGE"); + char *store; + if(!words[4]) + store = words[3]; + else { + store = callocz(PLUGINSD_LINE_MAX + 1, sizeof(char)); + char *move = store; + int i = 3; + while (i < w) { + size_t length = strlen(words[i]); + memcpy(move, words[i], length); + move += length; + *move++ = ' '; + + i++; + if(!words[i]) + break; + } + } + + new_labels = add_label_to_list(new_labels, words[1], store, strtol(words[2], NULL, 10)); + } + else if(likely(hash == OVERWRITE_HASH && !strcmp(s, PLUGINSD_KEYWORD_OVERWRITE))) { + debug(D_PLUGINSD, "requested a OVERWITE a variable"); + if(!host->labels) { + host->labels = new_labels; + } else { + replace_label_list(host, new_labels); + } + + new_labels = NULL; + } else { error("sent command '%s' which is not known by netdata, for host '%s'. Disabling it.", s, host->hostname); enabled = 0; @@ -626,6 +663,9 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int cleanup: cd->enabled = enabled; + if(new_labels) + free_host_labels(new_labels); + if(likely(count)) { cd->successful_collections += count; cd->serial_failures = 0; diff --git a/collectors/plugins.d/plugins_d.h b/collectors/plugins.d/plugins_d.h index 7d5c7dda47..8134b99650 100644 --- a/collectors/plugins.d/plugins_d.h +++ b/collectors/plugins.d/plugins_d.h @@ -29,6 +29,8 @@ #define PLUGINSD_KEYWORD_FLUSH "FLUSH" #define PLUGINSD_KEYWORD_DISABLE "DISABLE" #define PLUGINSD_KEYWORD_VARIABLE "VARIABLE" +#define PLUGINSD_KEYWORD_LABEL "LABEL" +#define PLUGINSD_KEYWORD_OVERWRITE "OVERWRITE" #define PLUGINSD_LINE_MAX 1024 #define PLUGINSD_LINE_MAX_SSL_READ 512 diff --git a/database/rrd.h b/database/rrd.h index 0702fc1710..40efd60e92 100644 --- a/database/rrd.h +++ b/database/rrd.h @@ -156,6 +156,9 @@ typedef enum label_source { LABEL_SOURCE_KUBERNETES = 4 } LABEL_SOURCE; +#define LABEL_FLAG_UPDATE_STREAM 1 +#define LABEL_FLAG_STOP_STREAM 2 + struct label { char *key, *value; uint32_t key_hash; @@ -166,6 +169,8 @@ struct label { char *translate_label_source(LABEL_SOURCE l); struct label *create_label(char *key, char *value, LABEL_SOURCE label_source); struct label *add_label_to_list(struct label *l, char *key, char *value, LABEL_SOURCE label_source); +extern void replace_label_list(RRDHOST *host, struct label *new_labels); +extern void free_host_labels(struct label *labels); void reload_host_labels(); // ---------------------------------------------------------------------------- @@ -749,6 +754,7 @@ struct rrdhost { // Support for host-level labels struct label *labels; netdata_rwlock_t labels_rwlock; // lock for the label list + uint32_t labels_flag; //Flags for labels // ------------------------------------------------------------------------ // indexes diff --git a/database/rrdhost.c b/database/rrdhost.c index df4e364d9b..a7fac8569b 100644 --- a/database/rrdhost.c +++ b/database/rrdhost.c @@ -890,6 +890,26 @@ struct label *create_label(char *key, char *value, LABEL_SOURCE label_source) return result; } +void free_host_labels(struct label *labels) +{ + while (labels != NULL) + { + struct label *current = labels; + labels = labels->next; + freez(current); + } +} + +void replace_label_list(RRDHOST *host, struct label *new_labels) +{ + netdata_rwlock_wrlock(&host->labels_rwlock); + struct label *old_labels = host->labels; + host->labels = new_labels; + netdata_rwlock_unlock(&host->labels_rwlock); + + free_host_labels(old_labels); +} + struct label *add_label_to_list(struct label *l, char *key, char *value, LABEL_SOURCE label_source) { struct label *lab = create_label(key, value, label_source); @@ -937,16 +957,11 @@ void reload_host_labels() struct label *new_labels = merge_label_lists(from_auto, from_k8s); new_labels = merge_label_lists(new_labels, from_config); - netdata_rwlock_wrlock(&localhost->labels_rwlock); - struct label *old_labels = localhost->labels; - localhost->labels = new_labels; - netdata_rwlock_unlock(&localhost->labels_rwlock); + replace_label_list(localhost, new_labels); - while (old_labels != NULL) - { - struct label *current = old_labels; - old_labels = old_labels->next; - freez(current); + if(localhost->rrdpush_send_enabled && localhost->rrdpush_sender_buffer){ + localhost->labels_flag |= LABEL_FLAG_UPDATE_STREAM; + rrdpush_send_labels(localhost); } health_reload(); diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index c88ee717ad..c6c391e4b1 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -25,7 +25,9 @@ * */ +#define STREAMING_PROTOCOL_VERSION "1.1" #define START_STREAMING_PROMPT "Hit me baby, push them over..." +#define START_STREAMING_PROMPT_V2 "Hit me baby, push them over and bring the host labels..." typedef enum { RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW, @@ -79,6 +81,7 @@ int rrdpush_init() { default_rrdpush_send_charts_matching = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "send charts matching", "*"); rrdhost_free_orphan_time = config_get_number(CONFIG_SECTION_GLOBAL, "cleanup orphan hosts after seconds", rrdhost_free_orphan_time); + if(default_rrdpush_enabled && (!default_rrdpush_destination || !*default_rrdpush_destination || !default_rrdpush_api_key || !*default_rrdpush_api_key)) { error("STREAM [send]: cannot enable sending thread - information is missing."); default_rrdpush_enabled = 0; @@ -338,6 +341,36 @@ void rrdset_done_push(RRDSET *st) { rrdpush_buffer_unlock(host); } +// labels +void rrdpush_send_labels(RRDHOST *host) { + if (!host->labels || !(host->labels_flag & LABEL_FLAG_UPDATE_STREAM) || (host->labels_flag & LABEL_FLAG_STOP_STREAM)) + return; + + rrdpush_buffer_lock(host); + netdata_rwlock_rdlock(&host->labels_rwlock); + + struct label *labels = host->labels; + while(labels) { + buffer_sprintf(host->rrdpush_sender_buffer + , "LABEL \"%s\" = %d %s\n" + , labels->key + , (int)labels->label_source + , labels->value); + + labels = labels->next; + } + + buffer_sprintf(host->rrdpush_sender_buffer + , "OVERWRITE %s\n", "labels"); + + netdata_rwlock_unlock(&host->labels_rwlock); + + if(host->rrdpush_sender_pipe[PIPE_WRITE] != -1 && write(host->rrdpush_sender_pipe[PIPE_WRITE], " ", 1) == -1) + error("STREAM %s [send]: cannot write to internal pipe", host->hostname); + + rrdpush_buffer_unlock(host); + host->labels_flag &= ~LABEL_FLAG_UPDATE_STREAM; +} // ---------------------------------------------------------------------------- // rrdpush sender thread @@ -536,6 +569,7 @@ static int rrdpush_sender_thread_connect_to_master(RRDHOST *host, int default_po "&NETDATA_SYSTEM_VIRT_DETECTION=%s" "&NETDATA_SYSTEM_CONTAINER=%s" "&NETDATA_SYSTEM_CONTAINER_DETECTION=%s" + "&NETDATA_PROTOCOL_VERSION=%s" " HTTP/1.1\r\n" "User-Agent: %s/%s\r\n" "Accept: */*\r\n\r\n" @@ -560,6 +594,7 @@ static int rrdpush_sender_thread_connect_to_master(RRDHOST *host, int default_po , (host->system_info->virt_detection) ? host->system_info->virt_detection : "" , (host->system_info->container) ? host->system_info->container : "" , (host->system_info->container_detection) ? host->system_info->container_detection : "" + , STREAMING_PROTOCOL_VERSION , host->program_name , host->program_version ); @@ -613,7 +648,20 @@ static int rrdpush_sender_thread_connect_to_master(RRDHOST *host, int default_po return 0; } - if(strncmp(http, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT)) != 0) { + int answer = strncmp(http, START_STREAMING_PROMPT_V2, strlen(START_STREAMING_PROMPT_V2)); + if(!answer) { + host->labels_flag |= LABEL_FLAG_UPDATE_STREAM; + host->labels_flag &= ~LABEL_FLAG_STOP_STREAM; + } else { + answer = strncmp(http, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT)); + if(!answer) { + host->labels_flag |= LABEL_FLAG_STOP_STREAM; + host->labels_flag &= ~LABEL_FLAG_UPDATE_STREAM; + info("STREAM %s [send to %s]: is using an old Netdata.", host->hostname, connected_to); + } + } + + if(answer != 0) { error("STREAM %s [send to %s]: server is not replying properly (is it a netdata?).", host->hostname, connected_to); rrdpush_sender_thread_close_socket(host); return 0; @@ -814,6 +862,8 @@ void *rrdpush_sender_thread(void *ptr) { } if (ofd->revents & POLLOUT) { + rrdpush_send_labels(host); + if (begin < buffer_strlen(host->rrdpush_sender_buffer)) { debug(D_STREAM, "STREAM: Sending data (current buffer length %zu bytes, begin = %zu)...", buffer_strlen(host->rrdpush_sender_buffer), begin); @@ -981,6 +1031,7 @@ static int rrdpush_receive(int fd , int update_every , char *client_ip , char *client_port + , int stream_flags #ifdef ENABLE_HTTPS , struct netdata_ssl *ssl #endif @@ -1097,12 +1148,20 @@ static int rrdpush_receive(int fd snprintfz(cd.cmd, PLUGINSD_CMD_MAX, "%s:%s", client_ip, client_port); info("STREAM %s [receive from [%s]:%s]: initializing communication...", host->hostname, client_ip, client_port); + char *initial_response; + if (stream_flags & LABEL_FLAG_UPDATE_STREAM) { + info("STREAM %s [receive from [%s]:%s]: Netdata is using the newest stream protocol.", host->hostname, client_ip, client_port); + initial_response = START_STREAMING_PROMPT_V2; + } else { + info("STREAM %s [receive from [%s]:%s]: Netdata is using an old protocol.", host->hostname, client_ip, client_port); + initial_response = START_STREAMING_PROMPT; + } #ifdef ENABLE_HTTPS host->stream_ssl.conn = ssl->conn; host->stream_ssl.flags = ssl->flags; - if(send_timeout(ssl,fd, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT), 0, 60) != strlen(START_STREAMING_PROMPT)) { + if(send_timeout(ssl, fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) { #else - if(send_timeout(fd, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT), 0, 60) != strlen(START_STREAMING_PROMPT)) { + if(send_timeout(fd, initial_response, strlen(initial_response), 0, 60) != strlen(initial_response)) { #endif log_stream_connection(client_ip, client_port, key, host->machine_guid, host->hostname, "FAILED - CANNOT REPLY"); error("STREAM %s [receive from [%s]:%s]: cannot send ready command.", host->hostname, client_ip, client_port); @@ -1142,6 +1201,8 @@ static int rrdpush_receive(int fd rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN); host->connected_senders++; host->senders_disconnected_time = 0; + host->labels_flag = stream_flags; + if(health_enabled != CONFIG_BOOLEAN_NO) { if(alarms_delay > 0) { host->health_delay_up_to = now_realtime_sec() + alarms_delay; @@ -1196,6 +1257,7 @@ struct rrdpush_thread { char *program_version; struct rrdhost_system_info *system_info; int update_every; + int stream_flags; #ifdef ENABLE_HTTPS struct netdata_ssl ssl; #endif @@ -1251,6 +1313,7 @@ static void *rrdpush_receiver_thread(void *ptr) { , rpt->update_every , rpt->client_ip , rpt->client_port + , rpt->stream_flags #ifdef ENABLE_HTTPS , &rpt->ssl #endif @@ -1300,6 +1363,7 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url char *key = NULL, *hostname = NULL, *registry_hostname = NULL, *machine_guid = NULL, *os = "unknown", *timezone = "unknown", *tags = NULL; int update_every = default_rrd_update_every; char buf[GUID_LEN + 1]; + int stream_flags = LABEL_FLAG_STOP_STREAM; struct rrdhost_system_info *system_info = callocz(1, sizeof(struct rrdhost_system_info)); @@ -1327,10 +1391,15 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url timezone = value; else if(!strcmp(name, "tags")) tags = value; - else - if(unlikely(rrdhost_set_system_info_variable(system_info, name, value))) { - info("STREAM [receive from [%s]:%s]: request has parameter '%s' = '%s', which is not used.", w->client_ip, w->client_port, key, value); - } + else { + if(!strcmp(name, "NETDATA_PROTOCOL_VERSION")) + stream_flags = LABEL_FLAG_UPDATE_STREAM; + else + if (unlikely(rrdhost_set_system_info_variable(system_info, name, value))) { + info("STREAM [receive from [%s]:%s]: request has parameter '%s' = '%s', which is not used.", + w->client_ip, w->client_port, key, value); + } + } } if(!key || !*key) { @@ -1444,6 +1513,7 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url rpt->client_port = strdupz(w->client_port); rpt->update_every = update_every; rpt->system_info = system_info; + rpt->stream_flags = stream_flags; #ifdef ENABLE_HTTPS rpt->ssl.conn = w->ssl.conn; rpt->ssl.flags = w->ssl.flags; diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h index 7b1acd9e11..6d290a835b 100644 --- a/streaming/rrdpush.h +++ b/streaming/rrdpush.h @@ -17,6 +17,7 @@ extern int configured_as_master(); extern void rrdset_done_push(RRDSET *st); extern void rrdset_push_chart_definition_now(RRDSET *st); extern void *rrdpush_sender_thread(void *ptr); +extern void rrdpush_send_labels(RRDHOST *host); extern int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url); extern void rrdpush_sender_thread_stop(RRDHOST *host); diff --git a/streaming/stream.conf b/streaming/stream.conf index fdff1f25fc..58dbcd8e68 100644 --- a/streaming/stream.conf +++ b/streaming/stream.conf @@ -87,7 +87,6 @@ # Sync the clock of the charts for that many iterations, when starting. initial clock resync iterations = 60 - # ----------------------------------------------------------------------------- # 2. ON MASTER NETDATA - THE ONE THAT WILL BE RECEIVING METRICS |