summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--conf.d/Makefile.am2
-rw-r--r--conf.d/stream.conf (renamed from conf.d/aggregated_hosts.conf)91
-rw-r--r--src/main.c21
-rw-r--r--src/registry.h1
-rw-r--r--src/registry_internals.c12
-rw-r--r--src/registry_internals.h2
-rw-r--r--src/registry_machine.c2
-rw-r--r--src/registry_person.c2
-rw-r--r--src/rrd.h15
-rw-r--r--src/rrdhost.c61
-rw-r--r--src/rrdpush.c98
-rw-r--r--src/rrdpush.h2
12 files changed, 213 insertions, 96 deletions
diff --git a/conf.d/Makefile.am b/conf.d/Makefile.am
index dd6c2cbf9e..743ab19e93 100644
--- a/conf.d/Makefile.am
+++ b/conf.d/Makefile.am
@@ -4,7 +4,6 @@
MAINTAINERCLEANFILES= $(srcdir)/Makefile.in
dist_config_DATA = \
- aggregated_hosts.conf \
apps_groups.conf \
charts.d.conf \
fping.conf \
@@ -12,6 +11,7 @@ dist_config_DATA = \
python.d.conf \
health_alarm_notify.conf \
health_email_recipients.conf \
+ stream.conf \
$(NULL)
nodeconfigdir=$(configdir)/node.d
diff --git a/conf.d/aggregated_hosts.conf b/conf.d/stream.conf
index d192a55eb8..2dcb25a629 100644
--- a/conf.d/aggregated_hosts.conf
+++ b/conf.d/stream.conf
@@ -8,27 +8,31 @@
#
# -----------------------------------------------------------------------------
# 1. SLAVE NETDATA - THE ONE THAT WILL BE SENDING METRICS
-#
-# In /etc/netdata/netdata.conf you have a configuration like this:
-#
-# [stream]
-#
-# # enable or disable sending metrics.
-# # default: no
-# enabled = yes
-#
-# # one or more netdata hosts to send metrics to.
-# # only one will get them, the first available.
-# destination = HOST1:PORT1 HOST2:PORT2 ...
-#
-# # the API key to use, to authorize ourselves
-# api key = API_KEY
-#
+
+[stream]
+ enabled = no
+
+ # where to send metrics to?
+ # A space separated list of IP:PORT is accepted. The first available will
+ # get the metrics.
+ # IPv6 addresses should be [IP]:PORT
+ destination =
+
+ # The API_KEY to use (as the sender)
+ api key =
+
+ # other options (uncomment and set)
+
+ # timeout seconds = 60
+ # default port = 19999
+ # buffer size bytes = 1048576
+ # reconnect delay seconds = 5
+ # initial clock resync iterations = 60
+
+
# -----------------------------------------------------------------------------
# 2. MASTER NETDATA - THE ONE THAT WILL BE RECEIVING METRICS
#
-# Use this file to define the API keys is accepts.
-#
# You can have one API key per slave, or the same API key for all slaves.
#
# All options below are used in this order:
@@ -38,8 +42,6 @@
# c) this netdata defaults (as in netdata.conf)
#
# You can combine the above (the more specific setting will be used).
-#
-# -----------------------------------------------------------------------------
# API key authentication
# If the key is not listed here, it will not be able to connect.
@@ -49,29 +51,41 @@
# You can disable the API key, by setting this to: no
# The default (for unknown API keys) is also: no
-# enabled = yes
+ enabled = no
# The default history in entries, for all hosts using this API key.
# You can also set it per host below.
# If you don't set it here, the history size of the central netdata
# will be used
-# default history = 3600
+ default history = 3600
# The default memory mode to be used for all hosts using this API key.
# You can also set it per host below.
- # If you don't set it here, the memory mode of the central netdata
- # will be used.
- # Valid modes: save (load/save db), map (like swap), ram (no disk)
-# default memory mode = save
+ # If you don't set it here, the memory mode of netdata.conf will be used.
+ # Valid modes:
+ # save save on exit, load on start
+ # map like swap (continuously syncing to disks)
+ # ram keep it in RAM, don't touch the disk
+ # none no database (passing through this netdata)
+ default memory mode = ram
# Shall we enable health monitoring for the hosts using this API key?
# 3 values:
# yes enable alarms
# no do not enable alarms
- # auto enable alarms, only when the host is streaming metrics
+ # auto enable alarms, only when the sending netdata is connected
# You can also set it per host, below.
- # The default is the same as the central netdata
-# health enabled by default = auto
+ # The default is the same as to netdata.conf
+ health enabled by default = auto
+
+ # postpone alarms for a short period after the sender is connected
+ default postpone alarms on connect seconds = 60
+
+ # need to route metrics differently? set these.
+ # the defaults are the ones at the [stream] section
+ #default proxy enabled = yes | no
+ #default proxy destination = IP:PORT IP:PORT ...
+ #default proxy api key = API_KEY
# -----------------------------------------------------------------------------
@@ -81,17 +95,24 @@
# you can give settings for each specific host here.
[MACHINE_GUID]
- # This can be used to stop receiving data
+ # enable this host: yes | no
# THIS IS NOT A SECURITY MECHANISM - AN ATTACKER CAN SET ANY OTHER GUID.
# Use only the API key for security.
-# enabled = yes
+ enabled = no
# The number of entries in the database
-# history = 3600
+ history = 3600
- # The memory mode of the database
-# memory mode = save
+ # The memory mode of the database: save | map | ram | none
+ memory mode = save
# Health / alarms control: yes | no | auto
-# health enabled = yes
+ health enabled = yes
+
+ # postpone alarms when the sender connects
+ postpone alarms on connect seconds = 60
+ # need to route metrics differently?
+ #proxy enabled = yes | no
+ #proxy destination = IP:PORT IP:PORT ...
+ #proxy api key = API_KEY
diff --git a/src/main.c b/src/main.c
index 7f590aca60..ae8f599aae 100644
--- a/src/main.c
+++ b/src/main.c
@@ -705,6 +705,8 @@ int main(int argc, char **argv) {
if(!config_loaded)
config_load(NULL, 0);
+ // ------------------------------------------------------------------------
+ // initialize netdata
{
char *pmax = config_get(CONFIG_SECTION_GLOBAL, "glibc malloc arena max for plugins", "1");
if(pmax && *pmax)
@@ -757,6 +759,16 @@ int main(int argc, char **argv) {
log_init();
error_log_limit_unlimited();
+
+ // --------------------------------------------------------------------
+ // load stream.conf
+ {
+ char filename[FILENAME_MAX + 1];
+ snprintfz(filename, FILENAME_MAX, "%s/stream.conf", netdata_configured_config_dir);
+ appconfig_load(&stream_config, filename, 0);
+ }
+
+
// --------------------------------------------------------------------
// setup process signals
@@ -851,15 +863,6 @@ int main(int argc, char **argv) {
if(web_server_mode != WEB_SERVER_MODE_NONE)
create_listen_sockets();
-
-
- // --------------------------------------------------------------------
- // load the aggregated host configuration file
- {
- char filename[FILENAME_MAX + 1];
- snprintfz(filename, FILENAME_MAX, "%s/aggregated_hosts.conf", netdata_configured_config_dir);
- appconfig_load(&stream_config, filename, 0);
- }
}
// initialize the log files
diff --git a/src/registry.h b/src/registry.h
index 7940cbb7a9..2c4592b922 100644
--- a/src/registry.h
+++ b/src/registry.h
@@ -70,5 +70,6 @@ extern int registry_request_hello_json(RRDHOST *host, struct web_client *w);
extern void registry_statistics(void);
extern char *registry_get_this_machine_guid(void);
+extern int regenerate_guid(const char *guid, char *result);
#endif /* NETDATA_REGISTRY_H */
diff --git a/src/registry_internals.c b/src/registry_internals.c
index df330cc8a7..9ec91ba401 100644
--- a/src/registry_internals.c
+++ b/src/registry_internals.c
@@ -7,7 +7,7 @@ struct registry registry;
// parse a GUID and re-generated to be always lower case
// this is used as a protection against the variations of GUIDs
-int registry_regenerate_guid(const char *guid, char *result) {
+int regenerate_guid(const char *guid, char *result) {
uuid_t uuid;
if(unlikely(uuid_parse(guid, uuid) == -1)) {
info("Registry: GUID '%s' is not a valid GUID.", guid);
@@ -18,7 +18,7 @@ int registry_regenerate_guid(const char *guid, char *result) {
#ifdef NETDATA_INTERNAL_CHECKS
if(strcmp(guid, result))
- info("Registry: source GUID '%s' and re-generated GUID '%s' differ!", guid, result);
+ info("GUID '%s' and re-generated GUID '%s' differ!", guid, result);
#endif /* NETDATA_INTERNAL_CHECKS */
}
@@ -96,14 +96,14 @@ REGISTRY_PERSON_URL *registry_verify_request(char *person_guid, char *machine_gu
url = registry_fix_url(url, NULL);
// make sure the person GUID is valid
- if(registry_regenerate_guid(person_guid, pbuf) == -1) {
+ if(regenerate_guid(person_guid, pbuf) == -1) {
info("Registry Request Verification: invalid person GUID, person: '%s', machine '%s', url '%s'", person_guid, machine_guid, url);
return NULL;
}
person_guid = pbuf;
// make sure the machine GUID is valid
- if(registry_regenerate_guid(machine_guid, mbuf) == -1) {
+ if(regenerate_guid(machine_guid, mbuf) == -1) {
info("Registry Request Verification: invalid machine GUID, person: '%s', machine '%s', url '%s'", person_guid, machine_guid, url);
return NULL;
}
@@ -226,7 +226,7 @@ REGISTRY_MACHINE *registry_request_machine(char *person_guid, char *machine_guid
if(!pu || !p || !m) return NULL;
// make sure the machine GUID is valid
- if(registry_regenerate_guid(request_machine, mbuf) == -1) {
+ if(regenerate_guid(request_machine, mbuf) == -1) {
info("Registry Machine URLs request: invalid machine GUID, person: '%s', machine '%s', url '%s', request machine '%s'", p->guid, m->guid, pu->url->url, request_machine);
return NULL;
}
@@ -288,7 +288,7 @@ char *registry_get_this_machine_guid(void) {
error("Failed to read machine GUID from '%s'", registry.machine_guid_filename);
else {
buf[GUID_LEN] = '\0';
- if(registry_regenerate_guid(buf, guid) == -1) {
+ if(regenerate_guid(buf, guid) == -1) {
error("Failed to validate machine GUID '%s' from '%s'. Ignoring it - this might mean this netdata will appear as duplicate in the registry.",
buf, registry.machine_guid_filename);
diff --git a/src/registry_internals.h b/src/registry_internals.h
index 8aef16b22a..27c2fe0745 100644
--- a/src/registry_internals.h
+++ b/src/registry_internals.h
@@ -59,7 +59,7 @@ struct registry {
pthread_mutex_t lock;
};
-extern int registry_regenerate_guid(const char *guid, char *result);
+extern int regenerate_guid(const char *guid, char *result);
#include "registry_url.h"
#include "registry_machine.h"
diff --git a/src/registry_machine.c b/src/registry_machine.c
index 3510736df4..6dc8200d39 100644
--- a/src/registry_machine.c
+++ b/src/registry_machine.c
@@ -58,7 +58,7 @@ REGISTRY_MACHINE *registry_machine_get(const char *machine_guid, time_t when) {
if(likely(machine_guid && *machine_guid)) {
// validate it is a GUID
char buf[GUID_LEN + 1];
- if(unlikely(registry_regenerate_guid(machine_guid, buf) == -1))
+ if(unlikely(regenerate_guid(machine_guid, buf) == -1))
info("Registry: machine guid '%s' is not a valid guid. Ignoring it.", machine_guid);
else {
machine_guid = buf;
diff --git a/src/registry_person.c b/src/registry_person.c
index 5f9099c9aa..409c76925b 100644
--- a/src/registry_person.c
+++ b/src/registry_person.c
@@ -183,7 +183,7 @@ REGISTRY_PERSON *registry_person_get(const char *person_guid, time_t when) {
if(person_guid && *person_guid) {
char buf[GUID_LEN + 1];
// validate it is a GUID
- if(unlikely(registry_regenerate_guid(person_guid, buf) == -1))
+ if(unlikely(regenerate_guid(person_guid, buf) == -1))
info("Registry: person guid '%s' is not a valid guid. Ignoring it.", person_guid);
else {
person_guid = buf;
diff --git a/src/rrd.h b/src/rrd.h
index 2cb06607f5..14ef0f7212 100644
--- a/src/rrd.h
+++ b/src/rrd.h
@@ -342,6 +342,8 @@ struct rrdhost {
int rrd_history_entries; // the number of history entries for the host's charts
int rrdpush_enabled; // 1 when this host sends metrics to another netdata
+ char *rrdpush_destination; // where to send metrics to
+ char *rrdpush_api_key; // the api key at the receiving netdata
volatile int rrdpush_connected; // 1 when the sender is ready to push metrics
volatile int rrdpush_spawn; // 1 when the sender thread has been spawn
volatile int rrdpush_error_shown; // 1 when we have logged a communication error
@@ -423,7 +425,18 @@ extern pthread_rwlock_t rrd_rwlock;
extern void rrd_init(char *hostname);
extern RRDHOST *rrdhost_find(const char *guid, uint32_t hash);
-extern RRDHOST *rrdhost_find_or_create(const char *hostname, const char *guid, const char *os, int update_every, int history, RRD_MEMORY_MODE mode, int health_enabled);
+extern RRDHOST *rrdhost_find_or_create(
+ const char *hostname
+ , const char *guid
+ , const char *os
+ , int update_every
+ , int history
+ , RRD_MEMORY_MODE mode
+ , int health_enabled
+ , int rrdpush_enabled
+ , char *rrdpush_destination
+ , char *rrdpush_api_key
+);
#ifdef NETDATA_INTERNAL_CHECKS
extern void rrdhost_check_wrlock_int(RRDHOST *host, const char *file, const char *function, const unsigned long line);
diff --git a/src/rrdhost.c b/src/rrdhost.c
index 6e9d7baade..658fba0012 100644
--- a/src/rrdhost.c
+++ b/src/rrdhost.c
@@ -65,6 +65,9 @@ RRDHOST *rrdhost_create(const char *hostname,
int entries,
RRD_MEMORY_MODE memory_mode,
int health_enabled,
+ int rrdpush_enabled,
+ char *rrdpush_destination,
+ char *rrdpush_api_key,
int is_localhost
) {
@@ -76,11 +79,13 @@ RRDHOST *rrdhost_create(const char *hostname,
host->rrd_history_entries = entries;
host->rrd_memory_mode = memory_mode;
host->health_enabled = (memory_mode == RRD_MEMORY_MODE_NONE)? 0 : health_enabled;
- host->rrdpush_enabled = default_rrdpush_enabled;
+ host->rrdpush_enabled = (rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key);
+ host->rrdpush_destination = (host->rrdpush_enabled)?strdupz(rrdpush_destination):NULL;
+ host->rrdpush_api_key = (host->rrdpush_enabled)?strdupz(rrdpush_api_key):NULL;
host->rrdpush_pipe[0] = -1;
host->rrdpush_pipe[1] = -1;
- host->rrdpush_socket = -1;
+ host->rrdpush_socket = -1;
pthread_mutex_init(&host->rrdpush_mutex, NULL);
pthread_rwlock_init(&host->rrdhost_rwlock, NULL);
@@ -201,6 +206,7 @@ RRDHOST *rrdhost_create(const char *hostname,
", memory mode: %s"
", history entries: %d"
", streaming: %s"
+ " to: '%s' (api key: '%s')"
", health: %s"
", cache_dir: '%s'"
", varlib_dir: '%s'"
@@ -214,6 +220,8 @@ RRDHOST *rrdhost_create(const char *hostname,
, rrd_memory_mode_name(host->rrd_memory_mode)
, host->rrd_history_entries
, host->rrdpush_enabled?"enabled":"disabled"
+ , host->rrdpush_destination
+ , host->rrdpush_api_key
, host->health_enabled?"enabled":"disabled"
, host->cache_dir
, host->varlib_dir
@@ -228,12 +236,35 @@ RRDHOST *rrdhost_create(const char *hostname,
return host;
}
-RRDHOST *rrdhost_find_or_create(const char *hostname, const char *guid, const char *os, int update_every, int history, RRD_MEMORY_MODE mode, int health_enabled) {
+RRDHOST *rrdhost_find_or_create(
+ const char *hostname
+ , const char *guid
+ , const char *os
+ , int update_every
+ , int history
+ , RRD_MEMORY_MODE mode
+ , int health_enabled
+ , int rrdpush_enabled
+ , char *rrdpush_destination
+ , char *rrdpush_api_key
+) {
debug(D_RRDHOST, "Searching for host '%s' with guid '%s'", hostname, guid);
RRDHOST *host = rrdhost_find(guid, 0);
if(!host) {
- host = rrdhost_create(hostname, guid, os, update_every, history, mode, health_enabled, 0);
+ host = rrdhost_create(
+ hostname
+ , guid
+ , os
+ , update_every
+ , history
+ , mode
+ , health_enabled
+ , rrdpush_enabled
+ , rrdpush_destination
+ , rrdpush_api_key
+ , 0
+ );
}
else {
host->health_enabled = health_enabled;
@@ -267,14 +298,18 @@ void rrd_init(char *hostname) {
rrdpush_init();
debug(D_RRDHOST, "Initializing localhost with hostname '%s'", hostname);
- localhost = rrdhost_create(hostname,
- registry_get_this_machine_guid(),
- os_type,
- default_rrd_update_every,
- default_rrd_history_entries,
- default_rrd_memory_mode,
- default_health_enabled,
- 1
+ localhost = rrdhost_create(
+ hostname
+ , registry_get_this_machine_guid()
+ , os_type
+ , default_rrd_update_every
+ , default_rrd_history_entries
+ , default_rrd_memory_mode
+ , default_health_enabled
+ , default_rrdpush_enabled
+ , default_rrdpush_destination
+ , default_rrdpush_api_key
+ , 1
);
}
@@ -369,6 +404,8 @@ void rrdhost_free(RRDHOST *host) {
freez(host->os);
freez(host->cache_dir);
freez(host->varlib_dir);
+ freez(host->rrdpush_api_key);
+ freez(host->rrdpush_destination);
freez(host->health_default_exec);
freez(host->health_default_recipient);
freez(host->health_log_filename);
diff --git a/src/rrdpush.c b/src/rrdpush.c
index 218bcc39e3..67bb552aa6 100644
--- a/src/rrdpush.c
+++ b/src/rrdpush.c
@@ -23,18 +23,22 @@
*
*/
+#define START_STREAMING_PROMPT "Hit me baby, push them over..."
+
int default_rrdpush_enabled = 0;
-static char *rrdpush_destination = NULL;
-static char *rrdpush_api_key = NULL;
+char *default_rrdpush_destination = NULL;
+char *default_rrdpush_api_key = NULL;
int rrdpush_init() {
- default_rrdpush_enabled = config_get_boolean(CONFIG_SECTION_STREAM, "enabled", default_rrdpush_enabled);
- rrdpush_destination = config_get(CONFIG_SECTION_STREAM, "destination", "");
- rrdpush_api_key = config_get(CONFIG_SECTION_STREAM, "api key", "");
+ default_rrdpush_enabled = appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "enabled", default_rrdpush_enabled);
+ default_rrdpush_destination = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "destination", "");
+ default_rrdpush_api_key = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "api key", "");
- if(default_rrdpush_enabled && (!rrdpush_destination || !*rrdpush_destination || !rrdpush_api_key || !*rrdpush_api_key)) {
+ if(default_rrdpush_enabled && (!default_rrdpush_destination || !*default_rrdpush_destination || !default_rrdpush_api_key || !*default_rrdpush_api_key)) {
error("STREAM [send]: cannot enable sending thread - information is missing.");
default_rrdpush_enabled = 0;
+ default_rrdpush_api_key = NULL;
+ default_rrdpush_destination = NULL;
}
return default_rrdpush_enabled;
@@ -237,14 +241,14 @@ void *rrdpush_sender_thread(void *ptr) {
if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
error("STREAM %s [send]: cannot set pthread cancel state to ENABLE.", host->hostname);
- int timeout = (int)config_get_number(CONFIG_SECTION_STREAM, "timeout seconds", 60);
- int default_port = (int)config_get_number(CONFIG_SECTION_STREAM, "default port", 19999);
- size_t max_size = (size_t)config_get_number(CONFIG_SECTION_STREAM, "buffer size bytes", 1024 * 1024);
- unsigned int reconnect_delay = (unsigned int)config_get_number(CONFIG_SECTION_STREAM, "reconnect delay seconds", 5);
- remote_clock_resync_iterations = (unsigned int)config_get_number(CONFIG_SECTION_STREAM, "initial clock resync iterations", remote_clock_resync_iterations);
+ int timeout = (int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "timeout seconds", 60);
+ int default_port = (int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "default port", 19999);
+ size_t max_size = (size_t)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "buffer size bytes", 1024 * 1024);
+ unsigned int reconnect_delay = (unsigned int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "reconnect delay seconds", 5);
+ remote_clock_resync_iterations = (unsigned int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "initial clock resync iterations", remote_clock_resync_iterations);
char connected_to[CONNECTED_TO_SIZE + 1] = "";
- if(!host->rrdpush_enabled || !rrdpush_destination || !*rrdpush_destination || !rrdpush_api_key || !*rrdpush_api_key)
+ if(!host->rrdpush_enabled || !host->rrdpush_destination || !*host->rrdpush_destination || !host->rrdpush_api_key || !*host->rrdpush_api_key)
goto cleanup;
// initialize rrdpush globals
@@ -276,11 +280,11 @@ void *rrdpush_sender_thread(void *ptr) {
// they will be lost, so there is no point to do it
host->rrdpush_connected = 0;
- info("STREAM %s [send to %s]: connecting...", host->hostname, rrdpush_destination);
- host->rrdpush_socket = connect_to_one_of(rrdpush_destination, default_port, &tv, &reconnects_counter, connected_to, CONNECTED_TO_SIZE);
+ info("STREAM %s [send to %s]: connecting...", host->hostname, host->rrdpush_destination);
+ host->rrdpush_socket = connect_to_one_of(host->rrdpush_destination, default_port, &tv, &reconnects_counter, connected_to, CONNECTED_TO_SIZE);
if(unlikely(host->rrdpush_socket == -1)) {
- error("STREAM %s [send to %s]: failed to connect", host->hostname, rrdpush_destination);
+ error("STREAM %s [send to %s]: failed to connect", host->hostname, host->rrdpush_destination);
sleep(reconnect_delay);
continue;
}
@@ -292,7 +296,7 @@ void *rrdpush_sender_thread(void *ptr) {
"STREAM key=%s&hostname=%s&machine_guid=%s&os=%s&update_every=%d HTTP/1.1\r\n"
"User-Agent: netdata-push-service/%s\r\n"
"Accept: */*\r\n\r\n"
- , rrdpush_api_key
+ , host->rrdpush_api_key
, host->hostname
, host->machine_guid
, host->os
@@ -318,7 +322,7 @@ void *rrdpush_sender_thread(void *ptr) {
continue;
}
- if(strncmp(http, "STREAM", 6)) {
+ if(strncmp(http, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT))) {
close(host->rrdpush_socket);
host->rrdpush_socket = -1;
error("STREAM %s [send to %s]: server is not replying properly.", host->hostname, connected_to);
@@ -428,6 +432,9 @@ int rrdpush_receive(int fd, const char *key, const char *hostname, const char *m
int history = default_rrd_history_entries;
RRD_MEMORY_MODE mode = default_rrd_memory_mode;
int health_enabled = default_health_enabled;
+ int rrdpush_enabled = default_rrdpush_enabled;
+ char *rrdpush_destination = default_rrdpush_destination;
+ char *rrdpush_api_key = default_rrdpush_api_key;
time_t alarms_delay = 60;
update_every = (int)appconfig_get_number(&stream_config, machine_guid, "update every", update_every);
@@ -446,10 +453,30 @@ int rrdpush_receive(int fd, const char *key, const char *hostname, const char *m
alarms_delay = appconfig_get_number(&stream_config, key, "default postpone alarms on connect seconds", alarms_delay);
alarms_delay = appconfig_get_number(&stream_config, machine_guid, "postpone alarms on connect seconds", alarms_delay);
+ rrdpush_enabled = appconfig_get_boolean(&stream_config, key, "default proxy enabled", rrdpush_enabled);
+ rrdpush_enabled = appconfig_get_boolean(&stream_config, machine_guid, "proxy enabled", rrdpush_enabled);
+
+ rrdpush_destination = appconfig_get(&stream_config, key, "default proxy destination", rrdpush_destination);
+ rrdpush_destination = appconfig_get(&stream_config, machine_guid, "proxy destination", rrdpush_destination);
+
+ rrdpush_api_key = appconfig_get(&stream_config, key, "default proxy api key", rrdpush_api_key);
+ rrdpush_api_key = appconfig_get(&stream_config, machine_guid, "proxy api key", rrdpush_api_key);
+
if(!strcmp(machine_guid, "localhost"))
host = localhost;
else
- host = rrdhost_find_or_create(hostname, machine_guid, os, update_every, history, mode, (health_enabled == CONFIG_BOOLEAN_NO)?0:1);
+ host = rrdhost_find_or_create(
+ hostname
+ , machine_guid
+ , os
+ , update_every
+ , history
+ , mode
+ , (health_enabled != CONFIG_BOOLEAN_NO)
+ , (rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key)
+ , rrdpush_destination
+ , rrdpush_api_key
+ );
if(!host) {
close(fd);
@@ -457,7 +484,8 @@ int rrdpush_receive(int fd, const char *key, const char *hostname, const char *m
return 1;
}
- info("STREAM %s [receive from [%s]:%s]: metrics for host '%s' with machine_guid '%s': update every = %d, history = %d, memory mode = %s, health %s"
+#ifdef NETDATA_INTERNAL_CHECKS
+ info("STREAM %s [receive from [%s]:%s]: client willing to stream metrics for host '%s' with machine_guid '%s': update every = %d, history = %d, memory mode = %s, health %s"
, hostname
, client_ip
, client_port
@@ -468,6 +496,7 @@ int rrdpush_receive(int fd, const char *key, const char *hostname, const char *m
, rrd_memory_mode_name(host->rrd_memory_mode)
, (health_enabled == CONFIG_BOOLEAN_NO)?"disabled":((health_enabled == CONFIG_BOOLEAN_YES)?"enabled":"auto")
);
+#endif // NETDATA_INTERNAL_CHECKS
struct plugind cd = {
.enabled = 1,
@@ -487,8 +516,8 @@ int rrdpush_receive(int fd, const char *key, const char *hostname, const char *m
snprintfz(cd.cmd, PLUGINSD_CMD_MAX, "%s:%s", client_ip, client_port);
info("STREAM %s [receive from [%s]:%s]: initializing communication...", host->hostname, client_ip, client_port);
- if(send_timeout(fd, "STREAM", 6, 0, 60) != 6) {
- error("STREAM %s [receive from [%s]:%s]: cannot send STREAM command.", host->hostname, client_ip, client_port);
+ if(send_timeout(fd, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT), 0, 60) != strlen(START_STREAMING_PROMPT)) {
+ error("STREAM %s [receive from [%s]:%s]: cannot send ready command.", host->hostname, client_ip, client_port);
return 0;
}
@@ -510,9 +539,9 @@ int rrdpush_receive(int fd, const char *key, const char *hostname, const char *m
rrdhost_unlock(host);
// call the plugins.d processor to receive the metrics
- info("STREAM %s [receive from [%s]:%s]: receiving metrics... (host '%s', machine GUID '%s').", host->hostname, client_ip, client_port, host->hostname, host->machine_guid);
+ info("STREAM %s [receive from [%s]:%s]: receiving metrics...", host->hostname, client_ip, client_port);
size_t count = pluginsd_process(host, &cd, fp, 1);
- error("STREAM %s [receive from [%s]:%s]: disconnected (host '%s', machine GUID '%s', completed updates %zu).", host->hostname, client_ip, client_port, host->hostname, host->machine_guid, count);
+ error("STREAM %s [receive from [%s]:%s]: disconnected (completed updates %zu).", host->hostname, client_ip, client_port, count);
rrdhost_wrlock(host);
host->use_counter--;
@@ -564,10 +593,6 @@ void *rrdpush_receiver_thread(void *ptr) {
return NULL;
}
-static inline int rrdpush_receive_validate_api_key(const char *key) {
- return appconfig_get_boolean(&stream_config, key, "enabled", 0);
-}
-
void rrdpush_sender_thread_spawn(RRDHOST *host) {
if(pthread_create(&host->rrdpush_thread, NULL, rrdpush_sender_thread, (void *)host))
error("STREAM %s [send]: failed to create new thread for client.", host->hostname);
@@ -585,6 +610,7 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url
char *key = NULL, *hostname = NULL, *machine_guid = NULL, *os = NULL;
int update_every = default_rrd_update_every;
+ char buf[GUID_LEN + 1];
while(url) {
char *value = mystrsep(&url, "?&");
@@ -627,8 +653,22 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url
return 400;
}
- if(!rrdpush_receive_validate_api_key(key)) {
- error("STREAM [receive from [%s]:%s]: API key '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, key);
+ if(regenerate_guid(key, buf) == -1) {
+ error("STREAM [receive from [%s]:%s]: API key '%s' is not valid GUID. Forbidding access.", w->client_ip, w->client_port, key);
+ buffer_flush(w->response.data);
+ buffer_sprintf(w->response.data, "Your API key is invalid.");
+ return 401;
+ }
+
+ if(regenerate_guid(machine_guid, buf) == -1) {
+ error("STREAM [receive from [%s]:%s]: machine GUID '%s' is not GUID. Forbidding access.", w->client_ip, w->client_port, key);
+ buffer_flush(w->response.data);
+ buffer_sprintf(w->response.data, "Your machine GUID is invalid.");
+ return 404;
+ }
+
+ if(!appconfig_get_boolean(&stream_config, key, "enabled", 1)) {
+ error("STREAM [receive from [%s]:%s]: API key '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, machine_guid);
buffer_flush(w->response.data);
buffer_sprintf(w->response.data, "Your API key is not permitted access.");
return 401;
diff --git a/src/rrdpush.h b/src/rrdpush.h
index 6a7d8e59a9..ef3545f8f4 100644
--- a/src/rrdpush.h
+++ b/src/rrdpush.h
@@ -2,6 +2,8 @@
#define NETDATA_RRDPUSH_H
extern int default_rrdpush_enabled;
+extern char *default_rrdpush_destination;
+extern char *default_rrdpush_api_key;
extern int rrdpush_init();
extern void rrdset_done_push(RRDSET *st);