diff options
Diffstat (limited to 'src/rrdpush.c')
-rw-r--r-- | src/rrdpush.c | 98 |
1 files changed, 69 insertions, 29 deletions
diff --git a/src/rrdpush.c b/src/rrdpush.c index 218bcc39e3..67bb552aa6 100644 --- a/src/rrdpush.c +++ b/src/rrdpush.c @@ -23,18 +23,22 @@ * */ +#define START_STREAMING_PROMPT "Hit me baby, push them over..." + int default_rrdpush_enabled = 0; -static char *rrdpush_destination = NULL; -static char *rrdpush_api_key = NULL; +char *default_rrdpush_destination = NULL; +char *default_rrdpush_api_key = NULL; int rrdpush_init() { - default_rrdpush_enabled = config_get_boolean(CONFIG_SECTION_STREAM, "enabled", default_rrdpush_enabled); - rrdpush_destination = config_get(CONFIG_SECTION_STREAM, "destination", ""); - rrdpush_api_key = config_get(CONFIG_SECTION_STREAM, "api key", ""); + default_rrdpush_enabled = appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "enabled", default_rrdpush_enabled); + default_rrdpush_destination = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "destination", ""); + default_rrdpush_api_key = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "api key", ""); - if(default_rrdpush_enabled && (!rrdpush_destination || !*rrdpush_destination || !rrdpush_api_key || !*rrdpush_api_key)) { + if(default_rrdpush_enabled && (!default_rrdpush_destination || !*default_rrdpush_destination || !default_rrdpush_api_key || !*default_rrdpush_api_key)) { error("STREAM [send]: cannot enable sending thread - information is missing."); default_rrdpush_enabled = 0; + default_rrdpush_api_key = NULL; + default_rrdpush_destination = NULL; } return default_rrdpush_enabled; @@ -237,14 +241,14 @@ void *rrdpush_sender_thread(void *ptr) { if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0) error("STREAM %s [send]: cannot set pthread cancel state to ENABLE.", host->hostname); - int timeout = (int)config_get_number(CONFIG_SECTION_STREAM, "timeout seconds", 60); - int default_port = (int)config_get_number(CONFIG_SECTION_STREAM, "default port", 19999); - size_t max_size = (size_t)config_get_number(CONFIG_SECTION_STREAM, "buffer size bytes", 1024 * 1024); - unsigned int reconnect_delay = (unsigned int)config_get_number(CONFIG_SECTION_STREAM, "reconnect delay seconds", 5); - remote_clock_resync_iterations = (unsigned int)config_get_number(CONFIG_SECTION_STREAM, "initial clock resync iterations", remote_clock_resync_iterations); + int timeout = (int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "timeout seconds", 60); + int default_port = (int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "default port", 19999); + size_t max_size = (size_t)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "buffer size bytes", 1024 * 1024); + unsigned int reconnect_delay = (unsigned int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "reconnect delay seconds", 5); + remote_clock_resync_iterations = (unsigned int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "initial clock resync iterations", remote_clock_resync_iterations); char connected_to[CONNECTED_TO_SIZE + 1] = ""; - if(!host->rrdpush_enabled || !rrdpush_destination || !*rrdpush_destination || !rrdpush_api_key || !*rrdpush_api_key) + if(!host->rrdpush_enabled || !host->rrdpush_destination || !*host->rrdpush_destination || !host->rrdpush_api_key || !*host->rrdpush_api_key) goto cleanup; // initialize rrdpush globals @@ -276,11 +280,11 @@ void *rrdpush_sender_thread(void *ptr) { // they will be lost, so there is no point to do it host->rrdpush_connected = 0; - info("STREAM %s [send to %s]: connecting...", host->hostname, rrdpush_destination); - host->rrdpush_socket = connect_to_one_of(rrdpush_destination, default_port, &tv, &reconnects_counter, connected_to, CONNECTED_TO_SIZE); + info("STREAM %s [send to %s]: connecting...", host->hostname, host->rrdpush_destination); + host->rrdpush_socket = connect_to_one_of(host->rrdpush_destination, default_port, &tv, &reconnects_counter, connected_to, CONNECTED_TO_SIZE); if(unlikely(host->rrdpush_socket == -1)) { - error("STREAM %s [send to %s]: failed to connect", host->hostname, rrdpush_destination); + error("STREAM %s [send to %s]: failed to connect", host->hostname, host->rrdpush_destination); sleep(reconnect_delay); continue; } @@ -292,7 +296,7 @@ void *rrdpush_sender_thread(void *ptr) { "STREAM key=%s&hostname=%s&machine_guid=%s&os=%s&update_every=%d HTTP/1.1\r\n" "User-Agent: netdata-push-service/%s\r\n" "Accept: */*\r\n\r\n" - , rrdpush_api_key + , host->rrdpush_api_key , host->hostname , host->machine_guid , host->os @@ -318,7 +322,7 @@ void *rrdpush_sender_thread(void *ptr) { continue; } - if(strncmp(http, "STREAM", 6)) { + if(strncmp(http, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT))) { close(host->rrdpush_socket); host->rrdpush_socket = -1; error("STREAM %s [send to %s]: server is not replying properly.", host->hostname, connected_to); @@ -428,6 +432,9 @@ int rrdpush_receive(int fd, const char *key, const char *hostname, const char *m int history = default_rrd_history_entries; RRD_MEMORY_MODE mode = default_rrd_memory_mode; int health_enabled = default_health_enabled; + int rrdpush_enabled = default_rrdpush_enabled; + char *rrdpush_destination = default_rrdpush_destination; + char *rrdpush_api_key = default_rrdpush_api_key; time_t alarms_delay = 60; update_every = (int)appconfig_get_number(&stream_config, machine_guid, "update every", update_every); @@ -446,10 +453,30 @@ int rrdpush_receive(int fd, const char *key, const char *hostname, const char *m alarms_delay = appconfig_get_number(&stream_config, key, "default postpone alarms on connect seconds", alarms_delay); alarms_delay = appconfig_get_number(&stream_config, machine_guid, "postpone alarms on connect seconds", alarms_delay); + rrdpush_enabled = appconfig_get_boolean(&stream_config, key, "default proxy enabled", rrdpush_enabled); + rrdpush_enabled = appconfig_get_boolean(&stream_config, machine_guid, "proxy enabled", rrdpush_enabled); + + rrdpush_destination = appconfig_get(&stream_config, key, "default proxy destination", rrdpush_destination); + rrdpush_destination = appconfig_get(&stream_config, machine_guid, "proxy destination", rrdpush_destination); + + rrdpush_api_key = appconfig_get(&stream_config, key, "default proxy api key", rrdpush_api_key); + rrdpush_api_key = appconfig_get(&stream_config, machine_guid, "proxy api key", rrdpush_api_key); + if(!strcmp(machine_guid, "localhost")) host = localhost; else - host = rrdhost_find_or_create(hostname, machine_guid, os, update_every, history, mode, (health_enabled == CONFIG_BOOLEAN_NO)?0:1); + host = rrdhost_find_or_create( + hostname + , machine_guid + , os + , update_every + , history + , mode + , (health_enabled != CONFIG_BOOLEAN_NO) + , (rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key) + , rrdpush_destination + , rrdpush_api_key + ); if(!host) { close(fd); @@ -457,7 +484,8 @@ int rrdpush_receive(int fd, const char *key, const char *hostname, const char *m return 1; } - info("STREAM %s [receive from [%s]:%s]: metrics for host '%s' with machine_guid '%s': update every = %d, history = %d, memory mode = %s, health %s" +#ifdef NETDATA_INTERNAL_CHECKS + info("STREAM %s [receive from [%s]:%s]: client willing to stream metrics for host '%s' with machine_guid '%s': update every = %d, history = %d, memory mode = %s, health %s" , hostname , client_ip , client_port @@ -468,6 +496,7 @@ int rrdpush_receive(int fd, const char *key, const char *hostname, const char *m , rrd_memory_mode_name(host->rrd_memory_mode) , (health_enabled == CONFIG_BOOLEAN_NO)?"disabled":((health_enabled == CONFIG_BOOLEAN_YES)?"enabled":"auto") ); +#endif // NETDATA_INTERNAL_CHECKS struct plugind cd = { .enabled = 1, @@ -487,8 +516,8 @@ int rrdpush_receive(int fd, const char *key, const char *hostname, const char *m snprintfz(cd.cmd, PLUGINSD_CMD_MAX, "%s:%s", client_ip, client_port); info("STREAM %s [receive from [%s]:%s]: initializing communication...", host->hostname, client_ip, client_port); - if(send_timeout(fd, "STREAM", 6, 0, 60) != 6) { - error("STREAM %s [receive from [%s]:%s]: cannot send STREAM command.", host->hostname, client_ip, client_port); + if(send_timeout(fd, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT), 0, 60) != strlen(START_STREAMING_PROMPT)) { + error("STREAM %s [receive from [%s]:%s]: cannot send ready command.", host->hostname, client_ip, client_port); return 0; } @@ -510,9 +539,9 @@ int rrdpush_receive(int fd, const char *key, const char *hostname, const char *m rrdhost_unlock(host); // call the plugins.d processor to receive the metrics - info("STREAM %s [receive from [%s]:%s]: receiving metrics... (host '%s', machine GUID '%s').", host->hostname, client_ip, client_port, host->hostname, host->machine_guid); + info("STREAM %s [receive from [%s]:%s]: receiving metrics...", host->hostname, client_ip, client_port); size_t count = pluginsd_process(host, &cd, fp, 1); - error("STREAM %s [receive from [%s]:%s]: disconnected (host '%s', machine GUID '%s', completed updates %zu).", host->hostname, client_ip, client_port, host->hostname, host->machine_guid, count); + error("STREAM %s [receive from [%s]:%s]: disconnected (completed updates %zu).", host->hostname, client_ip, client_port, count); rrdhost_wrlock(host); host->use_counter--; @@ -564,10 +593,6 @@ void *rrdpush_receiver_thread(void *ptr) { return NULL; } -static inline int rrdpush_receive_validate_api_key(const char *key) { - return appconfig_get_boolean(&stream_config, key, "enabled", 0); -} - void rrdpush_sender_thread_spawn(RRDHOST *host) { if(pthread_create(&host->rrdpush_thread, NULL, rrdpush_sender_thread, (void *)host)) error("STREAM %s [send]: failed to create new thread for client.", host->hostname); @@ -585,6 +610,7 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url char *key = NULL, *hostname = NULL, *machine_guid = NULL, *os = NULL; int update_every = default_rrd_update_every; + char buf[GUID_LEN + 1]; while(url) { char *value = mystrsep(&url, "?&"); @@ -627,8 +653,22 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url return 400; } - if(!rrdpush_receive_validate_api_key(key)) { - error("STREAM [receive from [%s]:%s]: API key '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, key); + if(regenerate_guid(key, buf) == -1) { + error("STREAM [receive from [%s]:%s]: API key '%s' is not valid GUID. Forbidding access.", w->client_ip, w->client_port, key); + buffer_flush(w->response.data); + buffer_sprintf(w->response.data, "Your API key is invalid."); + return 401; + } + + if(regenerate_guid(machine_guid, buf) == -1) { + error("STREAM [receive from [%s]:%s]: machine GUID '%s' is not GUID. Forbidding access.", w->client_ip, w->client_port, key); + buffer_flush(w->response.data); + buffer_sprintf(w->response.data, "Your machine GUID is invalid."); + return 404; + } + + if(!appconfig_get_boolean(&stream_config, key, "enabled", 1)) { + error("STREAM [receive from [%s]:%s]: API key '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, machine_guid); buffer_flush(w->response.data); buffer_sprintf(w->response.data, "Your API key is not permitted access."); return 401; |