summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMarkos Fountoulakis <44345837+mfundul@users.noreply.github.com>2020-03-16 13:39:00 +0200
committerGitHub <noreply@github.com>2020-03-16 13:39:00 +0200
commit161ba1592f5412bd378ace104979ddb66cd33973 (patch)
tree3d278844c5a94b67661312f7138516d725ea6e0e
parent87fd050461eac9e69f78a075ed896a1eaeabcd04 (diff)
Fix streaming scaling (#8375)
* Disallow multiple streaming connections to the same master agent * Reject multiple streaming connections quickly without blocking * Increase timeout for systemd service shutdown to give time to flush the db. * Optimize page correlation ID to use atomic counter instead of locks * Reduce contention in global configuration mutex * Optimize complexity of inserting configuration sections from O(N) to O(1) * Reduce overhead of clockgettime() by utilizing CLOCK_MONOTONIC_COARSE when applicable. * Fix unit test compile errors
-rw-r--r--collectors/ebpf_process.plugin/ebpf_process.c2
-rw-r--r--daemon/main.c4
-rwxr-xr-xdatabase/engine/rrdengineapi.c4
-rw-r--r--database/engine/rrdenginelib.h6
-rw-r--r--database/rrdset.c22
-rw-r--r--exporting/read_config.c3
-rw-r--r--libnetdata/clocks/clocks.c23
-rw-r--r--libnetdata/clocks/clocks.h14
-rw-r--r--libnetdata/config/appconfig.c9
-rw-r--r--libnetdata/config/appconfig.h3
-rw-r--r--streaming/rrdpush.c109
-rw-r--r--streaming/stream.conf14
-rw-r--r--system/netdata.service.in2
-rw-r--r--web/api/tests/valid_urls.c3
-rw-r--r--web/api/tests/web_api.c3
15 files changed, 107 insertions, 114 deletions
diff --git a/collectors/ebpf_process.plugin/ebpf_process.c b/collectors/ebpf_process.plugin/ebpf_process.c
index 8c7a34aa1a..ce61cf0b99 100644
--- a/collectors/ebpf_process.plugin/ebpf_process.c
+++ b/collectors/ebpf_process.plugin/ebpf_process.c
@@ -809,7 +809,7 @@ static inline void set_log_file(char *ptr) {
}
static void set_global_values() {
- struct section *sec = collector_config.sections;
+ struct section *sec = collector_config.first_section;
while(sec) {
if(!strcasecmp(sec->name, "global")) {
struct config_option *values = sec->values;
diff --git a/daemon/main.c b/daemon/main.c
index f019cc6832..a0a26115d8 100644
--- a/daemon/main.c
+++ b/daemon/main.c
@@ -6,7 +6,8 @@ int netdata_zero_metrics_enabled;
int netdata_anonymous_statistics_enabled;
struct config netdata_config = {
- .sections = NULL,
+ .first_section = NULL,
+ .last_section = NULL,
.mutex = NETDATA_MUTEX_INITIALIZER,
.index = {
.avl_tree = {
@@ -1148,6 +1149,7 @@ int main(int argc, char **argv) {
mallopt(M_ARENA_MAX, 1);
#endif
test_clock_boottime();
+ test_clock_monotonic_coarse();
// prepare configuration environment variables for the plugins
diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c
index 07323a5112..bd6110ce38 100755
--- a/database/engine/rrdengineapi.c
+++ b/database/engine/rrdengineapi.c
@@ -175,9 +175,7 @@ void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number n
handle->descr = descr;
- uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
- handle->page_correlation_id = pg_cache->committed_page_index.latest_corr_id++;
- uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
+ handle->page_correlation_id = rrd_atomic_fetch_add(&pg_cache->committed_page_index.latest_corr_id, 1);
if (0 == rd->rrdset->rrddim_page_alignment) {
/* this is the leading dimension that defines chart alignment */
diff --git a/database/engine/rrdenginelib.h b/database/engine/rrdenginelib.h
index 295f7df056..85836b97fe 100644
--- a/database/engine/rrdenginelib.h
+++ b/database/engine/rrdenginelib.h
@@ -27,11 +27,13 @@ struct rrdengine_instance;
typedef uintptr_t rrdeng_stats_t;
#ifdef __ATOMIC_RELAXED
-#define rrd_stat_atomic_add(p, n) do {(void) __atomic_fetch_add(p, n, __ATOMIC_RELAXED);} while(0)
+#define rrd_atomic_fetch_add(p, n) __atomic_fetch_add(p, n, __ATOMIC_RELAXED)
#else
-#define rrd_stat_atomic_add(p, n) do {(void) __sync_fetch_and_add(p, n);} while(0)
+#define rrd_atomic_fetch_add(p, n) __sync_fetch_and_add(p, n)
#endif
+#define rrd_stat_atomic_add(p, n) rrd_atomic_fetch_add(p, n)
+
#define RRDENG_PATH_MAX (4096)
/* returns old *ptr value */
diff --git a/database/rrdset.c b/database/rrdset.c
index 7d5b98dd48..ced1021ac7 100644
--- a/database/rrdset.c
+++ b/database/rrdset.c
@@ -550,19 +550,21 @@ RRDSET *rrdset_create_custom(
// ------------------------------------------------------------------------
// get the options from the config, we need to create it
- long rentries = config_get_number(config_section, "history", history_entries);
- long entries = align_entries_to_pagesize(memory_mode, rentries);
- if(entries != rentries) entries = config_set_number(config_section, "history", entries);
-
- if(memory_mode == RRD_MEMORY_MODE_NONE && entries != rentries)
- entries = config_set_number(config_section, "history", 10);
-
+ long entries;
+ if(memory_mode == RRD_MEMORY_MODE_DBENGINE) {
+ // only sets it the first time
+ entries = config_get_number(config_section, "history", 5);
+ } else {
+ long rentries = config_get_number(config_section, "history", history_entries);
+ entries = align_entries_to_pagesize(memory_mode, rentries);
+ if (entries != rentries) entries = config_set_number(config_section, "history", entries);
+
+ if (memory_mode == RRD_MEMORY_MODE_NONE && entries != rentries)
+ entries = config_set_number(config_section, "history", 10);
+ }
int enabled = config_get_boolean(config_section, "enabled", 1);
if(!enabled) entries = 5;
- if(memory_mode == RRD_MEMORY_MODE_DBENGINE)
- entries = config_set_number(config_section, "history", 5);
-
unsigned long size = sizeof(RRDSET);
char *cache_dir = rrdset_cache_dir(host, fullid, config_section);
diff --git a/exporting/read_config.c b/exporting/read_config.c
index cc18ab132c..4b27ad1ef1 100644
--- a/exporting/read_config.c
+++ b/exporting/read_config.c
@@ -2,7 +2,8 @@
#include "exporting_engine.h"
-struct config exporting_config = {.sections = NULL,
+struct config exporting_config = {.first_section = NULL,
+ .last_section = NULL,
.mutex = NETDATA_MUTEX_INITIALIZER,
.index = {.avl_tree = {.root = NULL, .compar = appconfig_section_compare},
.rwlock = AVL_LOCK_INITIALIZER}};
diff --git a/libnetdata/clocks/clocks.c b/libnetdata/clocks/clocks.c
index 161225a9b6..3875b2a255 100644
--- a/libnetdata/clocks/clocks.c
+++ b/libnetdata/clocks/clocks.c
@@ -3,6 +3,7 @@
#include "../libnetdata.h"
static int clock_boottime_valid = 1;
+static int clock_monotonic_coarse_valid = 1;
#ifndef HAVE_CLOCK_GETTIME
inline int clock_gettime(clockid_t clk_id, struct timespec *ts) {
@@ -23,6 +24,12 @@ void test_clock_boottime(void) {
clock_boottime_valid = 0;
}
+void test_clock_monotonic_coarse(void) {
+ struct timespec ts;
+ if(clock_gettime(CLOCK_MONOTONIC_COARSE, &ts) == -1 && errno == EINVAL)
+ clock_monotonic_coarse_valid = 0;
+}
+
static inline time_t now_sec(clockid_t clk_id) {
struct timespec ts;
if(unlikely(clock_gettime(clk_id, &ts) == -1)) {
@@ -69,27 +76,31 @@ inline int now_realtime_timeval(struct timeval *tv) {
}
inline time_t now_monotonic_sec(void) {
- return now_sec(CLOCK_MONOTONIC);
+ return now_sec(likely(clock_monotonic_coarse_valid) ? CLOCK_MONOTONIC_COARSE : CLOCK_MONOTONIC);
}
inline usec_t now_monotonic_usec(void) {
- return now_usec(CLOCK_MONOTONIC);
+ return now_usec(likely(clock_monotonic_coarse_valid) ? CLOCK_MONOTONIC_COARSE : CLOCK_MONOTONIC);
}
inline int now_monotonic_timeval(struct timeval *tv) {
- return now_timeval(CLOCK_MONOTONIC, tv);
+ return now_timeval(likely(clock_monotonic_coarse_valid) ? CLOCK_MONOTONIC_COARSE : CLOCK_MONOTONIC, tv);
}
inline time_t now_boottime_sec(void) {
- return now_sec(likely(clock_boottime_valid) ? CLOCK_BOOTTIME : CLOCK_MONOTONIC);
+ return now_sec(likely(clock_boottime_valid) ? CLOCK_BOOTTIME :
+ likely(clock_monotonic_coarse_valid) ? CLOCK_MONOTONIC_COARSE : CLOCK_MONOTONIC);
}
inline usec_t now_boottime_usec(void) {
- return now_usec(likely(clock_boottime_valid) ? CLOCK_BOOTTIME : CLOCK_MONOTONIC);
+ return now_usec(likely(clock_boottime_valid) ? CLOCK_BOOTTIME :
+ likely(clock_monotonic_coarse_valid) ? CLOCK_MONOTONIC_COARSE : CLOCK_MONOTONIC);
}
inline int now_boottime_timeval(struct timeval *tv) {
- return now_timeval(likely(clock_boottime_valid) ? CLOCK_BOOTTIME : CLOCK_MONOTONIC, tv);
+ return now_timeval(likely(clock_boottime_valid) ? CLOCK_BOOTTIME :
+ likely(clock_monotonic_coarse_valid) ? CLOCK_MONOTONIC_COARSE : CLOCK_MONOTONIC,
+ tv);
}
inline usec_t timeval_usec(struct timeval *tv) {
diff --git a/libnetdata/clocks/clocks.h b/libnetdata/clocks/clocks.h
index 4af451d60a..7938cb0d36 100644
--- a/libnetdata/clocks/clocks.h
+++ b/libnetdata/clocks/clocks.h
@@ -36,6 +36,12 @@ typedef struct heartbeat {
#define CLOCK_MONOTONIC CLOCK_REALTIME
#endif
+/* Prefer CLOCK_MONOTONIC_COARSE where available to reduce overhead. It has the same semantics as CLOCK_MONOTONIC */
+#ifndef CLOCK_MONOTONIC_COARSE
+/* fallback to CLOCK_MONOTONIC if not available */
+#define CLOCK_MONOTONIC_COARSE CLOCK_MONOTONIC
+#endif
+
#ifndef CLOCK_BOOTTIME
#ifdef CLOCK_UPTIME
@@ -43,7 +49,7 @@ typedef struct heartbeat {
#define CLOCK_BOOTTIME CLOCK_UPTIME
#else // CLOCK_UPTIME
/* CLOCK_BOOTTIME falls back to CLOCK_MONOTONIC */
-#define CLOCK_BOOTTIME CLOCK_MONOTONIC
+#define CLOCK_BOOTTIME CLOCK_MONOTONIC_COARSE
#endif // CLOCK_UPTIME
#else // CLOCK_BOOTTIME
@@ -136,6 +142,12 @@ extern int sleep_usec(usec_t usec);
*/
void test_clock_boottime(void);
+/*
+ * When running a binary with CLOCK_MONOTONIC_COARSE defined on a system with a linux kernel older than Linux 2.6.32 the
+ * clock_gettime(2) system call fails with EINVAL. In that case it must fall-back to CLOCK_MONOTONIC.
+ */
+void test_clock_monotonic_coarse(void);
+
extern collected_number uptime_msec(char *filename);
#endif /* NETDATA_CLOCKS_H */
diff --git a/libnetdata/config/appconfig.c b/libnetdata/config/appconfig.c
index c77cf34ec3..2fb21f1837 100644
--- a/libnetdata/config/appconfig.c
+++ b/libnetdata/config/appconfig.c
@@ -169,12 +169,13 @@ static inline struct section *appconfig_section_create(struct config *root, cons
error("INTERNAL ERROR: indexing of section '%s', already exists.", co->name);
appconfig_wrlock(root);
- struct section *co2 = root->sections;
+ struct section *co2 = root->last_section;
if(co2) {
- while (co2->next) co2 = co2->next;
co2->next = co;
+ } else {
+ root->first_section = co;
}
- else root->sections = co;
+ root->last_section = co;
appconfig_unlock(root);
return co;
@@ -678,7 +679,7 @@ void appconfig_generate(struct config *root, BUFFER *wb, int only_changed)
}
appconfig_wrlock(root);
- for(co = root->sections; co ; co = co->next) {
+ for(co = root->first_section; co ; co = co->next) {
if(!strcmp(co->name, CONFIG_SECTION_GLOBAL)
|| !strcmp(co->name, CONFIG_SECTION_WEB)
|| !strcmp(co->name, CONFIG_SECTION_STATSD)
diff --git a/libnetdata/config/appconfig.h b/libnetdata/config/appconfig.h
index 4688760514..d448205b63 100644
--- a/libnetdata/config/appconfig.h
+++ b/libnetdata/config/appconfig.h
@@ -141,7 +141,8 @@ struct section {
};
struct config {
- struct section *sections;
+ struct section *first_section;
+ struct section *last_section; // optimize inserting at the end
netdata_mutex_t mutex;
avl_tree_lock index;
};
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index 79e4e114ba..0afe233531 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -44,7 +44,8 @@ typedef struct {
} stream_encoded_t;
static struct config stream_config = {
- .sections = NULL,
+ .first_section = NULL,
+ .last_section = NULL,
.mutex = NETDATA_MUTEX_INITIALIZER,
.index = {
.avl_tree = {
@@ -168,7 +169,7 @@ int configured_as_master() {
int is_master = 0;
appconfig_wrlock(&stream_config);
- for (section = stream_config.sections; section; section = section->next) {
+ for (section = stream_config.first_section; section; section = section->next) {
uuid_t uuid;
if (uuid_parse(section->name, uuid) != -1 &&
@@ -1080,35 +1081,6 @@ static void log_stream_connection(const char *client_ip, const char *client_port
log_access("STREAM: %d '[%s]:%s' '%s' host '%s' api key '%s' machine guid '%s'", gettid(), client_ip, client_port, msg, host, api_key, machine_guid);
}
-static RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY get_multiple_connections_strategy(struct config *c, const char *section, const char *name, RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY def) {
- char *value;
- switch(def) {
- default:
- case RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW:
- value = "allow";
- break;
-
- case RRDPUSH_MULTIPLE_CONNECTIONS_DENY_NEW:
- value = "deny";
- break;
- }
-
- value = appconfig_get(c, section, name, value);
-
- RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY ret = def;
-
- if(strcasecmp(value, "allow") == 0 || strcasecmp(value, "permit") == 0 || strcasecmp(value, "accept") == 0)
- ret = RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW;
-
- else if(strcasecmp(value, "deny") == 0 || strcasecmp(value, "reject") == 0 || strcasecmp(value, "block") == 0)
- ret = RRDPUSH_MULTIPLE_CONNECTIONS_DENY_NEW;
-
- else
- error("Invalid stream config value at section [%s], setting '%s', value '%s'", section, name, value);
-
- return ret;
-}
-
static int rrdpush_receive(int fd
, const char *key
, const char *hostname
@@ -1137,7 +1109,6 @@ static int rrdpush_receive(int fd
char *rrdpush_api_key = default_rrdpush_api_key;
char *rrdpush_send_charts_matching = default_rrdpush_send_charts_matching;
time_t alarms_delay = 60;
- RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY rrdpush_multiple_connections_strategy = RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW;
update_every = (int)appconfig_get_number(&stream_config, machine_guid, "update every", update_every);
if(update_every < 0) update_every = 1;
@@ -1164,9 +1135,6 @@ static int rrdpush_receive(int fd
rrdpush_api_key = appconfig_get(&stream_config, key, "default proxy api key", rrdpush_api_key);
rrdpush_api_key = appconfig_get(&stream_config, machine_guid, "proxy api key", rrdpush_api_key);
- rrdpush_multiple_connections_strategy = get_multiple_connections_strategy(&stream_config, key, "multiple connections", rrdpush_multiple_connections_strategy);
- rrdpush_multiple_connections_strategy = get_multiple_connections_strategy(&stream_config, machine_guid, "multiple connections", rrdpush_multiple_connections_strategy);
-
rrdpush_send_charts_matching = appconfig_get(&stream_config, key, "default proxy send charts matching", rrdpush_send_charts_matching);
rrdpush_send_charts_matching = appconfig_get(&stream_config, machine_guid, "proxy send charts matching", rrdpush_send_charts_matching);
@@ -1179,26 +1147,40 @@ static int rrdpush_receive(int fd
close(fd);
return 1;
}
- else
- host = rrdhost_find_or_create(
- hostname
- , registry_hostname
- , machine_guid
- , os
- , timezone
- , tags
- , program_name
- , program_version
- , update_every
- , history
- , mode
- , (unsigned int)(health_enabled != CONFIG_BOOLEAN_NO)
- , (unsigned int)(rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key)
- , rrdpush_destination
- , rrdpush_api_key
- , rrdpush_send_charts_matching
- , system_info
- );
+
+ /*
+ * Quick path for rejecting multiple connections. Don't take any locks so that progress is made. The same
+ * condition will be checked again below, while holding the global and host writer locks. Any potential false
+ * positives will not cause harm. Data hazards with host deconstruction will be handled when reference counting
+ * is implemented.
+ */
+ host = rrdhost_find_by_guid(machine_guid, 0);
+ if(host && host->connected_senders > 0) {
+ log_stream_connection(client_ip, client_port, key, host->machine_guid, host->hostname, "REJECTED - ALREADY CONNECTED");
+ info("STREAM %s [receive from [%s]:%s]: multiple streaming connections for the same host detected. Rejecting new connection.", host->hostname, client_ip, client_port);
+ close(fd);
+ return 0;
+ }
+
+ host = rrdhost_find_or_create(
+ hostname
+ , registry_hostname
+ , machine_guid
+ , os
+ , timezone
+ , tags
+ , program_name
+ , program_version
+ , update_every
+ , history
+ , mode
+ , (unsigned int)(health_enabled != CONFIG_BOOLEAN_NO)
+ , (unsigned int)(rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key)
+ , rrdpush_destination
+ , rrdpush_api_key
+ , rrdpush_send_charts_matching
+ , system_info
+ );
if(!host) {
close(fd);
@@ -1279,18 +1261,11 @@ static int rrdpush_receive(int fd
rrdhost_wrlock(host);
if(host->connected_senders > 0) {
- switch(rrdpush_multiple_connections_strategy) {
- case RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW:
- info("STREAM %s [receive from [%s]:%s]: multiple streaming connections for the same host detected. If multiple netdata are pushing metrics for the same charts, at the same time, the result is unexpected.", host->hostname, client_ip, client_port);
- break;
-
- case RRDPUSH_MULTIPLE_CONNECTIONS_DENY_NEW:
- rrdhost_unlock(host);
- log_stream_connection(client_ip, client_port, key, host->machine_guid, host->hostname, "REJECTED - ALREADY CONNECTED");
- info("STREAM %s [receive from [%s]:%s]: multiple streaming connections for the same host detected. Rejecting new connection.", host->hostname, client_ip, client_port);
- fclose(fp);
- return 0;
- }
+ rrdhost_unlock(host);
+ log_stream_connection(client_ip, client_port, key, host->machine_guid, host->hostname, "REJECTED - ALREADY CONNECTED");
+ info("STREAM %s [receive from [%s]:%s]: multiple streaming connections for the same host detected. Rejecting new connection.", host->hostname, client_ip, client_port);
+ fclose(fp);
+ return 0;
}
rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN);
diff --git a/streaming/stream.conf b/streaming/stream.conf
index 58dbcd8e68..e4d05315d5 100644
--- a/streaming/stream.conf
+++ b/streaming/stream.conf
@@ -149,13 +149,6 @@
# postpone alarms for a short period after the sender is connected
default postpone alarms on connect seconds = 60
- # allow or deny multiple connections for the same host?
- # If you are sure all your netdata have their own machine GUID,
- # set this to 'allow', since it allows faster reconnects.
- # When set to 'deny', new connections for a host will not be
- # accepted until an existing connection is cleared.
- multiple connections = allow
-
# need to route metrics differently? set these.
# the defaults are the ones at the [stream] section (above)
#default proxy enabled = yes | no
@@ -204,13 +197,6 @@
# postpone alarms when the sender connects
postpone alarms on connect seconds = 60
- # allow or deny multiple connections for the same host?
- # If you are sure all your netdata have their own machine GUID,
- # set this to 'allow', since it allows faster reconnects.
- # When set to 'deny', new connections for a host will not be
- # accepted until an existing connection is cleared.
- multiple connections = allow
-
# need to route metrics differently?
# the defaults are the ones at the [API KEY] section
#proxy enabled = yes | no
diff --git a/system/netdata.service.in b/system/netdata.service.in
index 3abb16460c..e9edb229cf 100644
--- a/system/netdata.service.in
+++ b/system/netdata.service.in
@@ -21,7 +21,7 @@ ExecStartPre=/bin/chown -R netdata:netdata @localstatedir_POST@/run/netdata
PermissionsStartOnly=true
# saving a big db on slow disks may need some time
-TimeoutStopSec=60
+TimeoutStopSec=150
# restart netdata if it crashes
Restart=on-failure
diff --git a/web/api/tests/valid_urls.c b/web/api/tests/valid_urls.c
index c6032a5e97..cde982f847 100644
--- a/web/api/tests/valid_urls.c
+++ b/web/api/tests/valid_urls.c
@@ -182,7 +182,8 @@ WEB_SERVER_MODE web_server_mode = WEB_SERVER_MODE_STATIC_THREADED;
char *netdata_configured_web_dir = "UNKNOWN FIXME";
RRDHOST *localhost = NULL;
-struct config netdata_config = { .sections = NULL,
+struct config netdata_config = { .first_section = NULL,
+ .last_section = NULL,
.mutex = NETDATA_MUTEX_INITIALIZER,
.index = { .avl_tree = { .root = NULL, .compar = appconfig_section_compare },
.rwlock = AVL_LOCK_INITIALIZER } };
diff --git a/web/api/tests/web_api.c b/web/api/tests/web_api.c
index ef9fea0cf8..4a1c58a824 100644
--- a/web/api/tests/web_api.c
+++ b/web/api/tests/web_api.c
@@ -184,7 +184,8 @@ WEB_SERVER_MODE web_server_mode = WEB_SERVER_MODE_STATIC_THREADED;
char *netdata_configured_web_dir = "UNKNOWN FIXME";
RRDHOST *localhost = NULL;
-struct config netdata_config = { .sections = NULL,
+struct config netdata_config = { .first_section = NULL,
+ .last_section = NULL,
.mutex = NETDATA_MUTEX_INITIALIZER,
.index = { .avl_tree = { .root = NULL, .compar = appconfig_section_compare },
.rwlock = AVL_LOCK_INITIALIZER } };