summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorMarkos Fountoulakis <44345837+mfundul@users.noreply.github.com>2020-03-16 13:39:00 +0200
committerGitHub <noreply@github.com>2020-03-16 13:39:00 +0200
commit161ba1592f5412bd378ace104979ddb66cd33973 (patch)
tree3d278844c5a94b67661312f7138516d725ea6e0e /streaming
parent87fd050461eac9e69f78a075ed896a1eaeabcd04 (diff)
Fix streaming scaling (#8375)
* Disallow multiple streaming connections to the same master agent * Reject multiple streaming connections quickly without blocking * Increase timeout for systemd service shutdown to give time to flush the db. * Optimize page correlation ID to use atomic counter instead of locks * Reduce contention in global configuration mutex * Optimize complexity of inserting configuration sections from O(N) to O(1) * Reduce overhead of clockgettime() by utilizing CLOCK_MONOTONIC_COARSE when applicable. * Fix unit test compile errors
Diffstat (limited to 'streaming')
-rw-r--r--streaming/rrdpush.c109
-rw-r--r--streaming/stream.conf14
2 files changed, 42 insertions, 81 deletions
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index 79e4e114ba..0afe233531 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -44,7 +44,8 @@ typedef struct {
} stream_encoded_t;
static struct config stream_config = {
- .sections = NULL,
+ .first_section = NULL,
+ .last_section = NULL,
.mutex = NETDATA_MUTEX_INITIALIZER,
.index = {
.avl_tree = {
@@ -168,7 +169,7 @@ int configured_as_master() {
int is_master = 0;
appconfig_wrlock(&stream_config);
- for (section = stream_config.sections; section; section = section->next) {
+ for (section = stream_config.first_section; section; section = section->next) {
uuid_t uuid;
if (uuid_parse(section->name, uuid) != -1 &&
@@ -1080,35 +1081,6 @@ static void log_stream_connection(const char *client_ip, const char *client_port
log_access("STREAM: %d '[%s]:%s' '%s' host '%s' api key '%s' machine guid '%s'", gettid(), client_ip, client_port, msg, host, api_key, machine_guid);
}
-static RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY get_multiple_connections_strategy(struct config *c, const char *section, const char *name, RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY def) {
- char *value;
- switch(def) {
- default:
- case RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW:
- value = "allow";
- break;
-
- case RRDPUSH_MULTIPLE_CONNECTIONS_DENY_NEW:
- value = "deny";
- break;
- }
-
- value = appconfig_get(c, section, name, value);
-
- RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY ret = def;
-
- if(strcasecmp(value, "allow") == 0 || strcasecmp(value, "permit") == 0 || strcasecmp(value, "accept") == 0)
- ret = RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW;
-
- else if(strcasecmp(value, "deny") == 0 || strcasecmp(value, "reject") == 0 || strcasecmp(value, "block") == 0)
- ret = RRDPUSH_MULTIPLE_CONNECTIONS_DENY_NEW;
-
- else
- error("Invalid stream config value at section [%s], setting '%s', value '%s'", section, name, value);
-
- return ret;
-}
-
static int rrdpush_receive(int fd
, const char *key
, const char *hostname
@@ -1137,7 +1109,6 @@ static int rrdpush_receive(int fd
char *rrdpush_api_key = default_rrdpush_api_key;
char *rrdpush_send_charts_matching = default_rrdpush_send_charts_matching;
time_t alarms_delay = 60;
- RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY rrdpush_multiple_connections_strategy = RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW;
update_every = (int)appconfig_get_number(&stream_config, machine_guid, "update every", update_every);
if(update_every < 0) update_every = 1;
@@ -1164,9 +1135,6 @@ static int rrdpush_receive(int fd
rrdpush_api_key = appconfig_get(&stream_config, key, "default proxy api key", rrdpush_api_key);
rrdpush_api_key = appconfig_get(&stream_config, machine_guid, "proxy api key", rrdpush_api_key);
- rrdpush_multiple_connections_strategy = get_multiple_connections_strategy(&stream_config, key, "multiple connections", rrdpush_multiple_connections_strategy);
- rrdpush_multiple_connections_strategy = get_multiple_connections_strategy(&stream_config, machine_guid, "multiple connections", rrdpush_multiple_connections_strategy);
-
rrdpush_send_charts_matching = appconfig_get(&stream_config, key, "default proxy send charts matching", rrdpush_send_charts_matching);
rrdpush_send_charts_matching = appconfig_get(&stream_config, machine_guid, "proxy send charts matching", rrdpush_send_charts_matching);
@@ -1179,26 +1147,40 @@ static int rrdpush_receive(int fd
close(fd);
return 1;
}
- else
- host = rrdhost_find_or_create(
- hostname
- , registry_hostname
- , machine_guid
- , os
- , timezone
- , tags
- , program_name
- , program_version
- , update_every
- , history
- , mode
- , (unsigned int)(health_enabled != CONFIG_BOOLEAN_NO)
- , (unsigned int)(rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key)
- , rrdpush_destination
- , rrdpush_api_key
- , rrdpush_send_charts_matching
- , system_info
- );
+
+ /*
+ * Quick path for rejecting multiple connections. Don't take any locks so that progress is made. The same
+ * condition will be checked again below, while holding the global and host writer locks. Any potential false
+ * positives will not cause harm. Data hazards with host deconstruction will be handled when reference counting
+ * is implemented.
+ */
+ host = rrdhost_find_by_guid(machine_guid, 0);
+ if(host && host->connected_senders > 0) {
+ log_stream_connection(client_ip, client_port, key, host->machine_guid, host->hostname, "REJECTED - ALREADY CONNECTED");
+ info("STREAM %s [receive from [%s]:%s]: multiple streaming connections for the same host detected. Rejecting new connection.", host->hostname, client_ip, client_port);
+ close(fd);
+ return 0;
+ }
+
+ host = rrdhost_find_or_create(
+ hostname
+ , registry_hostname
+ , machine_guid
+ , os
+ , timezone
+ , tags
+ , program_name
+ , program_version
+ , update_every
+ , history
+ , mode
+ , (unsigned int)(health_enabled != CONFIG_BOOLEAN_NO)
+ , (unsigned int)(rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key)
+ , rrdpush_destination
+ , rrdpush_api_key
+ , rrdpush_send_charts_matching
+ , system_info
+ );
if(!host) {
close(fd);
@@ -1279,18 +1261,11 @@ static int rrdpush_receive(int fd
rrdhost_wrlock(host);
if(host->connected_senders > 0) {
- switch(rrdpush_multiple_connections_strategy) {
- case RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW:
- info("STREAM %s [receive from [%s]:%s]: multiple streaming connections for the same host detected. If multiple netdata are pushing metrics for the same charts, at the same time, the result is unexpected.", host->hostname, client_ip, client_port);
- break;
-
- case RRDPUSH_MULTIPLE_CONNECTIONS_DENY_NEW:
- rrdhost_unlock(host);
- log_stream_connection(client_ip, client_port, key, host->machine_guid, host->hostname, "REJECTED - ALREADY CONNECTED");
- info("STREAM %s [receive from [%s]:%s]: multiple streaming connections for the same host detected. Rejecting new connection.", host->hostname, client_ip, client_port);
- fclose(fp);
- return 0;
- }
+ rrdhost_unlock(host);
+ log_stream_connection(client_ip, client_port, key, host->machine_guid, host->hostname, "REJECTED - ALREADY CONNECTED");
+ info("STREAM %s [receive from [%s]:%s]: multiple streaming connections for the same host detected. Rejecting new connection.", host->hostname, client_ip, client_port);
+ fclose(fp);
+ return 0;
}
rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN);
diff --git a/streaming/stream.conf b/streaming/stream.conf
index 58dbcd8e68..e4d05315d5 100644
--- a/streaming/stream.conf
+++ b/streaming/stream.conf
@@ -149,13 +149,6 @@
# postpone alarms for a short period after the sender is connected
default postpone alarms on connect seconds = 60
- # allow or deny multiple connections for the same host?
- # If you are sure all your netdata have their own machine GUID,
- # set this to 'allow', since it allows faster reconnects.
- # When set to 'deny', new connections for a host will not be
- # accepted until an existing connection is cleared.
- multiple connections = allow
-
# need to route metrics differently? set these.
# the defaults are the ones at the [stream] section (above)
#default proxy enabled = yes | no
@@ -204,13 +197,6 @@
# postpone alarms when the sender connects
postpone alarms on connect seconds = 60
- # allow or deny multiple connections for the same host?
- # If you are sure all your netdata have their own machine GUID,
- # set this to 'allow', since it allows faster reconnects.
- # When set to 'deny', new connections for a host will not be
- # accepted until an existing connection is cleared.
- multiple connections = allow
-
# need to route metrics differently?
# the defaults are the ones at the [API KEY] section
#proxy enabled = yes | no