diff options
-rw-r--r-- | conf.d/Makefile.am | 2 | ||||
-rw-r--r-- | conf.d/stream.conf (renamed from conf.d/aggregated_hosts.conf) | 91 | ||||
-rw-r--r-- | src/main.c | 21 | ||||
-rw-r--r-- | src/registry.h | 1 | ||||
-rw-r--r-- | src/registry_internals.c | 12 | ||||
-rw-r--r-- | src/registry_internals.h | 2 | ||||
-rw-r--r-- | src/registry_machine.c | 2 | ||||
-rw-r--r-- | src/registry_person.c | 2 | ||||
-rw-r--r-- | src/rrd.h | 15 | ||||
-rw-r--r-- | src/rrdhost.c | 61 | ||||
-rw-r--r-- | src/rrdpush.c | 98 | ||||
-rw-r--r-- | src/rrdpush.h | 2 |
12 files changed, 213 insertions, 96 deletions
diff --git a/conf.d/Makefile.am b/conf.d/Makefile.am index dd6c2cbf9e..743ab19e93 100644 --- a/conf.d/Makefile.am +++ b/conf.d/Makefile.am @@ -4,7 +4,6 @@ MAINTAINERCLEANFILES= $(srcdir)/Makefile.in dist_config_DATA = \ - aggregated_hosts.conf \ apps_groups.conf \ charts.d.conf \ fping.conf \ @@ -12,6 +11,7 @@ dist_config_DATA = \ python.d.conf \ health_alarm_notify.conf \ health_email_recipients.conf \ + stream.conf \ $(NULL) nodeconfigdir=$(configdir)/node.d diff --git a/conf.d/aggregated_hosts.conf b/conf.d/stream.conf index d192a55eb8..2dcb25a629 100644 --- a/conf.d/aggregated_hosts.conf +++ b/conf.d/stream.conf @@ -8,27 +8,31 @@ # # ----------------------------------------------------------------------------- # 1. SLAVE NETDATA - THE ONE THAT WILL BE SENDING METRICS -# -# In /etc/netdata/netdata.conf you have a configuration like this: -# -# [stream] -# -# # enable or disable sending metrics. -# # default: no -# enabled = yes -# -# # one or more netdata hosts to send metrics to. -# # only one will get them, the first available. -# destination = HOST1:PORT1 HOST2:PORT2 ... -# -# # the API key to use, to authorize ourselves -# api key = API_KEY -# + +[stream] + enabled = no + + # where to send metrics to? + # A space separated list of IP:PORT is accepted. The first available will + # get the metrics. + # IPv6 addresses should be [IP]:PORT + destination = + + # The API_KEY to use (as the sender) + api key = + + # other options (uncomment and set) + + # timeout seconds = 60 + # default port = 19999 + # buffer size bytes = 1048576 + # reconnect delay seconds = 5 + # initial clock resync iterations = 60 + + # ----------------------------------------------------------------------------- # 2. MASTER NETDATA - THE ONE THAT WILL BE RECEIVING METRICS # -# Use this file to define the API keys is accepts. -# # You can have one API key per slave, or the same API key for all slaves. # # All options below are used in this order: @@ -38,8 +42,6 @@ # c) this netdata defaults (as in netdata.conf) # # You can combine the above (the more specific setting will be used). -# -# ----------------------------------------------------------------------------- # API key authentication # If the key is not listed here, it will not be able to connect. @@ -49,29 +51,41 @@ # You can disable the API key, by setting this to: no # The default (for unknown API keys) is also: no -# enabled = yes + enabled = no # The default history in entries, for all hosts using this API key. # You can also set it per host below. # If you don't set it here, the history size of the central netdata # will be used -# default history = 3600 + default history = 3600 # The default memory mode to be used for all hosts using this API key. # You can also set it per host below. - # If you don't set it here, the memory mode of the central netdata - # will be used. - # Valid modes: save (load/save db), map (like swap), ram (no disk) -# default memory mode = save + # If you don't set it here, the memory mode of netdata.conf will be used. + # Valid modes: + # save save on exit, load on start + # map like swap (continuously syncing to disks) + # ram keep it in RAM, don't touch the disk + # none no database (passing through this netdata) + default memory mode = ram # Shall we enable health monitoring for the hosts using this API key? # 3 values: # yes enable alarms # no do not enable alarms - # auto enable alarms, only when the host is streaming metrics + # auto enable alarms, only when the sending netdata is connected # You can also set it per host, below. - # The default is the same as the central netdata -# health enabled by default = auto + # The default is the same as to netdata.conf + health enabled by default = auto + + # postpone alarms for a short period after the sender is connected + default postpone alarms on connect seconds = 60 + + # need to route metrics differently? set these. + # the defaults are the ones at the [stream] section + #default proxy enabled = yes | no + #default proxy destination = IP:PORT IP:PORT ... + #default proxy api key = API_KEY # ----------------------------------------------------------------------------- @@ -81,17 +95,24 @@ # you can give settings for each specific host here. [MACHINE_GUID] - # This can be used to stop receiving data + # enable this host: yes | no # THIS IS NOT A SECURITY MECHANISM - AN ATTACKER CAN SET ANY OTHER GUID. # Use only the API key for security. -# enabled = yes + enabled = no # The number of entries in the database -# history = 3600 + history = 3600 - # The memory mode of the database -# memory mode = save + # The memory mode of the database: save | map | ram | none + memory mode = save # Health / alarms control: yes | no | auto -# health enabled = yes + health enabled = yes + + # postpone alarms when the sender connects + postpone alarms on connect seconds = 60 + # need to route metrics differently? + #proxy enabled = yes | no + #proxy destination = IP:PORT IP:PORT ... + #proxy api key = API_KEY diff --git a/src/main.c b/src/main.c index 7f590aca60..ae8f599aae 100644 --- a/src/main.c +++ b/src/main.c @@ -705,6 +705,8 @@ int main(int argc, char **argv) { if(!config_loaded) config_load(NULL, 0); + // ------------------------------------------------------------------------ + // initialize netdata { char *pmax = config_get(CONFIG_SECTION_GLOBAL, "glibc malloc arena max for plugins", "1"); if(pmax && *pmax) @@ -757,6 +759,16 @@ int main(int argc, char **argv) { log_init(); error_log_limit_unlimited(); + + // -------------------------------------------------------------------- + // load stream.conf + { + char filename[FILENAME_MAX + 1]; + snprintfz(filename, FILENAME_MAX, "%s/stream.conf", netdata_configured_config_dir); + appconfig_load(&stream_config, filename, 0); + } + + // -------------------------------------------------------------------- // setup process signals @@ -851,15 +863,6 @@ int main(int argc, char **argv) { if(web_server_mode != WEB_SERVER_MODE_NONE) create_listen_sockets(); - - - // -------------------------------------------------------------------- - // load the aggregated host configuration file - { - char filename[FILENAME_MAX + 1]; - snprintfz(filename, FILENAME_MAX, "%s/aggregated_hosts.conf", netdata_configured_config_dir); - appconfig_load(&stream_config, filename, 0); - } } // initialize the log files diff --git a/src/registry.h b/src/registry.h index 7940cbb7a9..2c4592b922 100644 --- a/src/registry.h +++ b/src/registry.h @@ -70,5 +70,6 @@ extern int registry_request_hello_json(RRDHOST *host, struct web_client *w); extern void registry_statistics(void); extern char *registry_get_this_machine_guid(void); +extern int regenerate_guid(const char *guid, char *result); #endif /* NETDATA_REGISTRY_H */ diff --git a/src/registry_internals.c b/src/registry_internals.c index df330cc8a7..9ec91ba401 100644 --- a/src/registry_internals.c +++ b/src/registry_internals.c @@ -7,7 +7,7 @@ struct registry registry; // parse a GUID and re-generated to be always lower case // this is used as a protection against the variations of GUIDs -int registry_regenerate_guid(const char *guid, char *result) { +int regenerate_guid(const char *guid, char *result) { uuid_t uuid; if(unlikely(uuid_parse(guid, uuid) == -1)) { info("Registry: GUID '%s' is not a valid GUID.", guid); @@ -18,7 +18,7 @@ int registry_regenerate_guid(const char *guid, char *result) { #ifdef NETDATA_INTERNAL_CHECKS if(strcmp(guid, result)) - info("Registry: source GUID '%s' and re-generated GUID '%s' differ!", guid, result); + info("GUID '%s' and re-generated GUID '%s' differ!", guid, result); #endif /* NETDATA_INTERNAL_CHECKS */ } @@ -96,14 +96,14 @@ REGISTRY_PERSON_URL *registry_verify_request(char *person_guid, char *machine_gu url = registry_fix_url(url, NULL); // make sure the person GUID is valid - if(registry_regenerate_guid(person_guid, pbuf) == -1) { + if(regenerate_guid(person_guid, pbuf) == -1) { info("Registry Request Verification: invalid person GUID, person: '%s', machine '%s', url '%s'", person_guid, machine_guid, url); return NULL; } person_guid = pbuf; // make sure the machine GUID is valid - if(registry_regenerate_guid(machine_guid, mbuf) == -1) { + if(regenerate_guid(machine_guid, mbuf) == -1) { info("Registry Request Verification: invalid machine GUID, person: '%s', machine '%s', url '%s'", person_guid, machine_guid, url); return NULL; } @@ -226,7 +226,7 @@ REGISTRY_MACHINE *registry_request_machine(char *person_guid, char *machine_guid if(!pu || !p || !m) return NULL; // make sure the machine GUID is valid - if(registry_regenerate_guid(request_machine, mbuf) == -1) { + if(regenerate_guid(request_machine, mbuf) == -1) { info("Registry Machine URLs request: invalid machine GUID, person: '%s', machine '%s', url '%s', request machine '%s'", p->guid, m->guid, pu->url->url, request_machine); return NULL; } @@ -288,7 +288,7 @@ char *registry_get_this_machine_guid(void) { error("Failed to read machine GUID from '%s'", registry.machine_guid_filename); else { buf[GUID_LEN] = '\0'; - if(registry_regenerate_guid(buf, guid) == -1) { + if(regenerate_guid(buf, guid) == -1) { error("Failed to validate machine GUID '%s' from '%s'. Ignoring it - this might mean this netdata will appear as duplicate in the registry.", buf, registry.machine_guid_filename); diff --git a/src/registry_internals.h b/src/registry_internals.h index 8aef16b22a..27c2fe0745 100644 --- a/src/registry_internals.h +++ b/src/registry_internals.h @@ -59,7 +59,7 @@ struct registry { pthread_mutex_t lock; }; -extern int registry_regenerate_guid(const char *guid, char *result); +extern int regenerate_guid(const char *guid, char *result); #include "registry_url.h" #include "registry_machine.h" diff --git a/src/registry_machine.c b/src/registry_machine.c index 3510736df4..6dc8200d39 100644 --- a/src/registry_machine.c +++ b/src/registry_machine.c @@ -58,7 +58,7 @@ REGISTRY_MACHINE *registry_machine_get(const char *machine_guid, time_t when) { if(likely(machine_guid && *machine_guid)) { // validate it is a GUID char buf[GUID_LEN + 1]; - if(unlikely(registry_regenerate_guid(machine_guid, buf) == -1)) + if(unlikely(regenerate_guid(machine_guid, buf) == -1)) info("Registry: machine guid '%s' is not a valid guid. Ignoring it.", machine_guid); else { machine_guid = buf; diff --git a/src/registry_person.c b/src/registry_person.c index 5f9099c9aa..409c76925b 100644 --- a/src/registry_person.c +++ b/src/registry_person.c @@ -183,7 +183,7 @@ REGISTRY_PERSON *registry_person_get(const char *person_guid, time_t when) { if(person_guid && *person_guid) { char buf[GUID_LEN + 1]; // validate it is a GUID - if(unlikely(registry_regenerate_guid(person_guid, buf) == -1)) + if(unlikely(regenerate_guid(person_guid, buf) == -1)) info("Registry: person guid '%s' is not a valid guid. Ignoring it.", person_guid); else { person_guid = buf; @@ -342,6 +342,8 @@ struct rrdhost { int rrd_history_entries; // the number of history entries for the host's charts int rrdpush_enabled; // 1 when this host sends metrics to another netdata + char *rrdpush_destination; // where to send metrics to + char *rrdpush_api_key; // the api key at the receiving netdata volatile int rrdpush_connected; // 1 when the sender is ready to push metrics volatile int rrdpush_spawn; // 1 when the sender thread has been spawn volatile int rrdpush_error_shown; // 1 when we have logged a communication error @@ -423,7 +425,18 @@ extern pthread_rwlock_t rrd_rwlock; extern void rrd_init(char *hostname); extern RRDHOST *rrdhost_find(const char *guid, uint32_t hash); -extern RRDHOST *rrdhost_find_or_create(const char *hostname, const char *guid, const char *os, int update_every, int history, RRD_MEMORY_MODE mode, int health_enabled); +extern RRDHOST *rrdhost_find_or_create( + const char *hostname + , const char *guid + , const char *os + , int update_every + , int history + , RRD_MEMORY_MODE mode + , int health_enabled + , int rrdpush_enabled + , char *rrdpush_destination + , char *rrdpush_api_key +); #ifdef NETDATA_INTERNAL_CHECKS extern void rrdhost_check_wrlock_int(RRDHOST *host, const char *file, const char *function, const unsigned long line); diff --git a/src/rrdhost.c b/src/rrdhost.c index 6e9d7baade..658fba0012 100644 --- a/src/rrdhost.c +++ b/src/rrdhost.c @@ -65,6 +65,9 @@ RRDHOST *rrdhost_create(const char *hostname, int entries, RRD_MEMORY_MODE memory_mode, int health_enabled, + int rrdpush_enabled, + char *rrdpush_destination, + char *rrdpush_api_key, int is_localhost ) { @@ -76,11 +79,13 @@ RRDHOST *rrdhost_create(const char *hostname, host->rrd_history_entries = entries; host->rrd_memory_mode = memory_mode; host->health_enabled = (memory_mode == RRD_MEMORY_MODE_NONE)? 0 : health_enabled; - host->rrdpush_enabled = default_rrdpush_enabled; + host->rrdpush_enabled = (rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key); + host->rrdpush_destination = (host->rrdpush_enabled)?strdupz(rrdpush_destination):NULL; + host->rrdpush_api_key = (host->rrdpush_enabled)?strdupz(rrdpush_api_key):NULL; host->rrdpush_pipe[0] = -1; host->rrdpush_pipe[1] = -1; - host->rrdpush_socket = -1; + host->rrdpush_socket = -1; pthread_mutex_init(&host->rrdpush_mutex, NULL); pthread_rwlock_init(&host->rrdhost_rwlock, NULL); @@ -201,6 +206,7 @@ RRDHOST *rrdhost_create(const char *hostname, ", memory mode: %s" ", history entries: %d" ", streaming: %s" + " to: '%s' (api key: '%s')" ", health: %s" ", cache_dir: '%s'" ", varlib_dir: '%s'" @@ -214,6 +220,8 @@ RRDHOST *rrdhost_create(const char *hostname, , rrd_memory_mode_name(host->rrd_memory_mode) , host->rrd_history_entries , host->rrdpush_enabled?"enabled":"disabled" + , host->rrdpush_destination + , host->rrdpush_api_key , host->health_enabled?"enabled":"disabled" , host->cache_dir , host->varlib_dir @@ -228,12 +236,35 @@ RRDHOST *rrdhost_create(const char *hostname, return host; } -RRDHOST *rrdhost_find_or_create(const char *hostname, const char *guid, const char *os, int update_every, int history, RRD_MEMORY_MODE mode, int health_enabled) { +RRDHOST *rrdhost_find_or_create( + const char *hostname + , const char *guid + , const char *os + , int update_every + , int history + , RRD_MEMORY_MODE mode + , int health_enabled + , int rrdpush_enabled + , char *rrdpush_destination + , char *rrdpush_api_key +) { debug(D_RRDHOST, "Searching for host '%s' with guid '%s'", hostname, guid); RRDHOST *host = rrdhost_find(guid, 0); if(!host) { - host = rrdhost_create(hostname, guid, os, update_every, history, mode, health_enabled, 0); + host = rrdhost_create( + hostname + , guid + , os + , update_every + , history + , mode + , health_enabled + , rrdpush_enabled + , rrdpush_destination + , rrdpush_api_key + , 0 + ); } else { host->health_enabled = health_enabled; @@ -267,14 +298,18 @@ void rrd_init(char *hostname) { rrdpush_init(); debug(D_RRDHOST, "Initializing localhost with hostname '%s'", hostname); - localhost = rrdhost_create(hostname, - registry_get_this_machine_guid(), - os_type, - default_rrd_update_every, - default_rrd_history_entries, - default_rrd_memory_mode, - default_health_enabled, - 1 + localhost = rrdhost_create( + hostname + , registry_get_this_machine_guid() + , os_type + , default_rrd_update_every + , default_rrd_history_entries + , default_rrd_memory_mode + , default_health_enabled + , default_rrdpush_enabled + , default_rrdpush_destination + , default_rrdpush_api_key + , 1 ); } @@ -369,6 +404,8 @@ void rrdhost_free(RRDHOST *host) { freez(host->os); freez(host->cache_dir); freez(host->varlib_dir); + freez(host->rrdpush_api_key); + freez(host->rrdpush_destination); freez(host->health_default_exec); freez(host->health_default_recipient); freez(host->health_log_filename); diff --git a/src/rrdpush.c b/src/rrdpush.c index 218bcc39e3..67bb552aa6 100644 --- a/src/rrdpush.c +++ b/src/rrdpush.c @@ -23,18 +23,22 @@ * */ +#define START_STREAMING_PROMPT "Hit me baby, push them over..." + int default_rrdpush_enabled = 0; -static char *rrdpush_destination = NULL; -static char *rrdpush_api_key = NULL; +char *default_rrdpush_destination = NULL; +char *default_rrdpush_api_key = NULL; int rrdpush_init() { - default_rrdpush_enabled = config_get_boolean(CONFIG_SECTION_STREAM, "enabled", default_rrdpush_enabled); - rrdpush_destination = config_get(CONFIG_SECTION_STREAM, "destination", ""); - rrdpush_api_key = config_get(CONFIG_SECTION_STREAM, "api key", ""); + default_rrdpush_enabled = appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "enabled", default_rrdpush_enabled); + default_rrdpush_destination = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "destination", ""); + default_rrdpush_api_key = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "api key", ""); - if(default_rrdpush_enabled && (!rrdpush_destination || !*rrdpush_destination || !rrdpush_api_key || !*rrdpush_api_key)) { + if(default_rrdpush_enabled && (!default_rrdpush_destination || !*default_rrdpush_destination || !default_rrdpush_api_key || !*default_rrdpush_api_key)) { error("STREAM [send]: cannot enable sending thread - information is missing."); default_rrdpush_enabled = 0; + default_rrdpush_api_key = NULL; + default_rrdpush_destination = NULL; } return default_rrdpush_enabled; @@ -237,14 +241,14 @@ void *rrdpush_sender_thread(void *ptr) { if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0) error("STREAM %s [send]: cannot set pthread cancel state to ENABLE.", host->hostname); - int timeout = (int)config_get_number(CONFIG_SECTION_STREAM, "timeout seconds", 60); - int default_port = (int)config_get_number(CONFIG_SECTION_STREAM, "default port", 19999); - size_t max_size = (size_t)config_get_number(CONFIG_SECTION_STREAM, "buffer size bytes", 1024 * 1024); - unsigned int reconnect_delay = (unsigned int)config_get_number(CONFIG_SECTION_STREAM, "reconnect delay seconds", 5); - remote_clock_resync_iterations = (unsigned int)config_get_number(CONFIG_SECTION_STREAM, "initial clock resync iterations", remote_clock_resync_iterations); + int timeout = (int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "timeout seconds", 60); + int default_port = (int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "default port", 19999); + size_t max_size = (size_t)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "buffer size bytes", 1024 * 1024); + unsigned int reconnect_delay = (unsigned int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "reconnect delay seconds", 5); + remote_clock_resync_iterations = (unsigned int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "initial clock resync iterations", remote_clock_resync_iterations); char connected_to[CONNECTED_TO_SIZE + 1] = ""; - if(!host->rrdpush_enabled || !rrdpush_destination || !*rrdpush_destination || !rrdpush_api_key || !*rrdpush_api_key) + if(!host->rrdpush_enabled || !host->rrdpush_destination || !*host->rrdpush_destination || !host->rrdpush_api_key || !*host->rrdpush_api_key) goto cleanup; // initialize rrdpush globals @@ -276,11 +280,11 @@ void *rrdpush_sender_thread(void *ptr) { // they will be lost, so there is no point to do it host->rrdpush_connected = 0; - info("STREAM %s [send to %s]: connecting...", host->hostname, rrdpush_destination); - host->rrdpush_socket = connect_to_one_of(rrdpush_destination, default_port, &tv, &reconnects_counter, connected_to, CONNECTED_TO_SIZE); + info("STREAM %s [send to %s]: connecting...", host->hostname, host->rrdpush_destination); + host->rrdpush_socket = connect_to_one_of(host->rrdpush_destination, default_port, &tv, &reconnects_counter, connected_to, CONNECTED_TO_SIZE); if(unlikely(host->rrdpush_socket == -1)) { - error("STREAM %s [send to %s]: failed to connect", host->hostname, rrdpush_destination); + error("STREAM %s [send to %s]: failed to connect", host->hostname, host->rrdpush_destination); sleep(reconnect_delay); continue; } @@ -292,7 +296,7 @@ void *rrdpush_sender_thread(void *ptr) { "STREAM key=%s&hostname=%s&machine_guid=%s&os=%s&update_every=%d HTTP/1.1\r\n" "User-Agent: netdata-push-service/%s\r\n" "Accept: */*\r\n\r\n" - , rrdpush_api_key + , host->rrdpush_api_key , host->hostname , host->machine_guid , host->os @@ -318,7 +322,7 @@ void *rrdpush_sender_thread(void *ptr) { continue; } - if(strncmp(http, "STREAM", 6)) { + if(strncmp(http, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT))) { close(host->rrdpush_socket); host->rrdpush_socket = -1; error("STREAM %s [send to %s]: server is not replying properly.", host->hostname, connected_to); @@ -428,6 +432,9 @@ int rrdpush_receive(int fd, const char *key, const char *hostname, const char *m int history = default_rrd_history_entries; RRD_MEMORY_MODE mode = default_rrd_memory_mode; int health_enabled = default_health_enabled; + int rrdpush_enabled = default_rrdpush_enabled; + char *rrdpush_destination = default_rrdpush_destination; + char *rrdpush_api_key = default_rrdpush_api_key; time_t alarms_delay = 60; update_every = (int)appconfig_get_number(&stream_config, machine_guid, "update every", update_every); @@ -446,10 +453,30 @@ int rrdpush_receive(int fd, const char *key, const char *hostname, const char *m alarms_delay = appconfig_get_number(&stream_config, key, "default postpone alarms on connect seconds", alarms_delay); alarms_delay = appconfig_get_number(&stream_config, machine_guid, "postpone alarms on connect seconds", alarms_delay); + rrdpush_enabled = appconfig_get_boolean(&stream_config, key, "default proxy enabled", rrdpush_enabled); + rrdpush_enabled = appconfig_get_boolean(&stream_config, machine_guid, "proxy enabled", rrdpush_enabled); + + rrdpush_destination = appconfig_get(&stream_config, key, "default proxy destination", rrdpush_destination); + rrdpush_destination = appconfig_get(&stream_config, machine_guid, "proxy destination", rrdpush_destination); + + rrdpush_api_key = appconfig_get(&stream_config, key, "default proxy api key", rrdpush_api_key); + rrdpush_api_key = appconfig_get(&stream_config, machine_guid, "proxy api key", rrdpush_api_key); + if(!strcmp(machine_guid, "localhost")) host = localhost; else - host = rrdhost_find_or_create(hostname, machine_guid, os, update_every, history, mode, (health_enabled == CONFIG_BOOLEAN_NO)?0:1); + host = rrdhost_find_or_create( + hostname + , machine_guid + , os + , update_every + , history + , mode + , (health_enabled != CONFIG_BOOLEAN_NO) + , (rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key) + , rrdpush_destination + , rrdpush_api_key + ); if(!host) { close(fd); @@ -457,7 +484,8 @@ int rrdpush_receive(int fd, const char *key, const char *hostname, const char *m return 1; } - info("STREAM %s [receive from [%s]:%s]: metrics for host '%s' with machine_guid '%s': update every = %d, history = %d, memory mode = %s, health %s" +#ifdef NETDATA_INTERNAL_CHECKS + info("STREAM %s [receive from [%s]:%s]: client willing to stream metrics for host '%s' with machine_guid '%s': update every = %d, history = %d, memory mode = %s, health %s" , hostname , client_ip , client_port @@ -468,6 +496,7 @@ int rrdpush_receive(int fd, const char *key, const char *hostname, const char *m , rrd_memory_mode_name(host->rrd_memory_mode) , (health_enabled == CONFIG_BOOLEAN_NO)?"disabled":((health_enabled == CONFIG_BOOLEAN_YES)?"enabled":"auto") ); +#endif // NETDATA_INTERNAL_CHECKS struct plugind cd = { .enabled = 1, @@ -487,8 +516,8 @@ int rrdpush_receive(int fd, const char *key, const char *hostname, const char *m snprintfz(cd.cmd, PLUGINSD_CMD_MAX, "%s:%s", client_ip, client_port); info("STREAM %s [receive from [%s]:%s]: initializing communication...", host->hostname, client_ip, client_port); - if(send_timeout(fd, "STREAM", 6, 0, 60) != 6) { - error("STREAM %s [receive from [%s]:%s]: cannot send STREAM command.", host->hostname, client_ip, client_port); + if(send_timeout(fd, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT), 0, 60) != strlen(START_STREAMING_PROMPT)) { + error("STREAM %s [receive from [%s]:%s]: cannot send ready command.", host->hostname, client_ip, client_port); return 0; } @@ -510,9 +539,9 @@ int rrdpush_receive(int fd, const char *key, const char *hostname, const char *m rrdhost_unlock(host); // call the plugins.d processor to receive the metrics - info("STREAM %s [receive from [%s]:%s]: receiving metrics... (host '%s', machine GUID '%s').", host->hostname, client_ip, client_port, host->hostname, host->machine_guid); + info("STREAM %s [receive from [%s]:%s]: receiving metrics...", host->hostname, client_ip, client_port); size_t count = pluginsd_process(host, &cd, fp, 1); - error("STREAM %s [receive from [%s]:%s]: disconnected (host '%s', machine GUID '%s', completed updates %zu).", host->hostname, client_ip, client_port, host->hostname, host->machine_guid, count); + error("STREAM %s [receive from [%s]:%s]: disconnected (completed updates %zu).", host->hostname, client_ip, client_port, count); rrdhost_wrlock(host); host->use_counter--; @@ -564,10 +593,6 @@ void *rrdpush_receiver_thread(void *ptr) { return NULL; } -static inline int rrdpush_receive_validate_api_key(const char *key) { - return appconfig_get_boolean(&stream_config, key, "enabled", 0); -} - void rrdpush_sender_thread_spawn(RRDHOST *host) { if(pthread_create(&host->rrdpush_thread, NULL, rrdpush_sender_thread, (void *)host)) error("STREAM %s [send]: failed to create new thread for client.", host->hostname); @@ -585,6 +610,7 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url char *key = NULL, *hostname = NULL, *machine_guid = NULL, *os = NULL; int update_every = default_rrd_update_every; + char buf[GUID_LEN + 1]; while(url) { char *value = mystrsep(&url, "?&"); @@ -627,8 +653,22 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url return 400; } - if(!rrdpush_receive_validate_api_key(key)) { - error("STREAM [receive from [%s]:%s]: API key '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, key); + if(regenerate_guid(key, buf) == -1) { + error("STREAM [receive from [%s]:%s]: API key '%s' is not valid GUID. Forbidding access.", w->client_ip, w->client_port, key); + buffer_flush(w->response.data); + buffer_sprintf(w->response.data, "Your API key is invalid."); + return 401; + } + + if(regenerate_guid(machine_guid, buf) == -1) { + error("STREAM [receive from [%s]:%s]: machine GUID '%s' is not GUID. Forbidding access.", w->client_ip, w->client_port, key); + buffer_flush(w->response.data); + buffer_sprintf(w->response.data, "Your machine GUID is invalid."); + return 404; + } + + if(!appconfig_get_boolean(&stream_config, key, "enabled", 1)) { + error("STREAM [receive from [%s]:%s]: API key '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, machine_guid); buffer_flush(w->response.data); buffer_sprintf(w->response.data, "Your API key is not permitted access."); return 401; diff --git a/src/rrdpush.h b/src/rrdpush.h index 6a7d8e59a9..ef3545f8f4 100644 --- a/src/rrdpush.h +++ b/src/rrdpush.h @@ -2,6 +2,8 @@ #define NETDATA_RRDPUSH_H extern int default_rrdpush_enabled; +extern char *default_rrdpush_destination; +extern char *default_rrdpush_api_key; extern int rrdpush_init(); extern void rrdset_done_push(RRDSET *st); |