diff options
author | Markos Fountoulakis <44345837+mfundul@users.noreply.github.com> | 2020-03-16 13:39:00 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-03-16 13:39:00 +0200 |
commit | 161ba1592f5412bd378ace104979ddb66cd33973 (patch) | |
tree | 3d278844c5a94b67661312f7138516d725ea6e0e | |
parent | 87fd050461eac9e69f78a075ed896a1eaeabcd04 (diff) |
Fix streaming scaling (#8375)
* Disallow multiple streaming connections to the same master agent
* Reject multiple streaming connections quickly without blocking
* Increase timeout for systemd service shutdown to give time to flush the db.
* Optimize page correlation ID to use atomic counter instead of locks
* Reduce contention in global configuration mutex
* Optimize complexity of inserting configuration sections from O(N) to O(1)
* Reduce overhead of clockgettime() by utilizing CLOCK_MONOTONIC_COARSE when applicable.
* Fix unit test compile errors
-rw-r--r-- | collectors/ebpf_process.plugin/ebpf_process.c | 2 | ||||
-rw-r--r-- | daemon/main.c | 4 | ||||
-rwxr-xr-x | database/engine/rrdengineapi.c | 4 | ||||
-rw-r--r-- | database/engine/rrdenginelib.h | 6 | ||||
-rw-r--r-- | database/rrdset.c | 22 | ||||
-rw-r--r-- | exporting/read_config.c | 3 | ||||
-rw-r--r-- | libnetdata/clocks/clocks.c | 23 | ||||
-rw-r--r-- | libnetdata/clocks/clocks.h | 14 | ||||
-rw-r--r-- | libnetdata/config/appconfig.c | 9 | ||||
-rw-r--r-- | libnetdata/config/appconfig.h | 3 | ||||
-rw-r--r-- | streaming/rrdpush.c | 109 | ||||
-rw-r--r-- | streaming/stream.conf | 14 | ||||
-rw-r--r-- | system/netdata.service.in | 2 | ||||
-rw-r--r-- | web/api/tests/valid_urls.c | 3 | ||||
-rw-r--r-- | web/api/tests/web_api.c | 3 |
15 files changed, 107 insertions, 114 deletions
diff --git a/collectors/ebpf_process.plugin/ebpf_process.c b/collectors/ebpf_process.plugin/ebpf_process.c index 8c7a34aa1a..ce61cf0b99 100644 --- a/collectors/ebpf_process.plugin/ebpf_process.c +++ b/collectors/ebpf_process.plugin/ebpf_process.c @@ -809,7 +809,7 @@ static inline void set_log_file(char *ptr) { } static void set_global_values() { - struct section *sec = collector_config.sections; + struct section *sec = collector_config.first_section; while(sec) { if(!strcasecmp(sec->name, "global")) { struct config_option *values = sec->values; diff --git a/daemon/main.c b/daemon/main.c index f019cc6832..a0a26115d8 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -6,7 +6,8 @@ int netdata_zero_metrics_enabled; int netdata_anonymous_statistics_enabled; struct config netdata_config = { - .sections = NULL, + .first_section = NULL, + .last_section = NULL, .mutex = NETDATA_MUTEX_INITIALIZER, .index = { .avl_tree = { @@ -1148,6 +1149,7 @@ int main(int argc, char **argv) { mallopt(M_ARENA_MAX, 1); #endif test_clock_boottime(); + test_clock_monotonic_coarse(); // prepare configuration environment variables for the plugins diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c index 07323a5112..bd6110ce38 100755 --- a/database/engine/rrdengineapi.c +++ b/database/engine/rrdengineapi.c @@ -175,9 +175,7 @@ void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number n handle->descr = descr; - uv_rwlock_wrlock(&pg_cache->committed_page_index.lock); - handle->page_correlation_id = pg_cache->committed_page_index.latest_corr_id++; - uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock); + handle->page_correlation_id = rrd_atomic_fetch_add(&pg_cache->committed_page_index.latest_corr_id, 1); if (0 == rd->rrdset->rrddim_page_alignment) { /* this is the leading dimension that defines chart alignment */ diff --git a/database/engine/rrdenginelib.h b/database/engine/rrdenginelib.h index 295f7df056..85836b97fe 100644 --- a/database/engine/rrdenginelib.h +++ b/database/engine/rrdenginelib.h @@ -27,11 +27,13 @@ struct rrdengine_instance; typedef uintptr_t rrdeng_stats_t; #ifdef __ATOMIC_RELAXED -#define rrd_stat_atomic_add(p, n) do {(void) __atomic_fetch_add(p, n, __ATOMIC_RELAXED);} while(0) +#define rrd_atomic_fetch_add(p, n) __atomic_fetch_add(p, n, __ATOMIC_RELAXED) #else -#define rrd_stat_atomic_add(p, n) do {(void) __sync_fetch_and_add(p, n);} while(0) +#define rrd_atomic_fetch_add(p, n) __sync_fetch_and_add(p, n) #endif +#define rrd_stat_atomic_add(p, n) rrd_atomic_fetch_add(p, n) + #define RRDENG_PATH_MAX (4096) /* returns old *ptr value */ diff --git a/database/rrdset.c b/database/rrdset.c index 7d5b98dd48..ced1021ac7 100644 --- a/database/rrdset.c +++ b/database/rrdset.c @@ -550,19 +550,21 @@ RRDSET *rrdset_create_custom( // ------------------------------------------------------------------------ // get the options from the config, we need to create it - long rentries = config_get_number(config_section, "history", history_entries); - long entries = align_entries_to_pagesize(memory_mode, rentries); - if(entries != rentries) entries = config_set_number(config_section, "history", entries); - - if(memory_mode == RRD_MEMORY_MODE_NONE && entries != rentries) - entries = config_set_number(config_section, "history", 10); - + long entries; + if(memory_mode == RRD_MEMORY_MODE_DBENGINE) { + // only sets it the first time + entries = config_get_number(config_section, "history", 5); + } else { + long rentries = config_get_number(config_section, "history", history_entries); + entries = align_entries_to_pagesize(memory_mode, rentries); + if (entries != rentries) entries = config_set_number(config_section, "history", entries); + + if (memory_mode == RRD_MEMORY_MODE_NONE && entries != rentries) + entries = config_set_number(config_section, "history", 10); + } int enabled = config_get_boolean(config_section, "enabled", 1); if(!enabled) entries = 5; - if(memory_mode == RRD_MEMORY_MODE_DBENGINE) - entries = config_set_number(config_section, "history", 5); - unsigned long size = sizeof(RRDSET); char *cache_dir = rrdset_cache_dir(host, fullid, config_section); diff --git a/exporting/read_config.c b/exporting/read_config.c index cc18ab132c..4b27ad1ef1 100644 --- a/exporting/read_config.c +++ b/exporting/read_config.c @@ -2,7 +2,8 @@ #include "exporting_engine.h" -struct config exporting_config = {.sections = NULL, +struct config exporting_config = {.first_section = NULL, + .last_section = NULL, .mutex = NETDATA_MUTEX_INITIALIZER, .index = {.avl_tree = {.root = NULL, .compar = appconfig_section_compare}, .rwlock = AVL_LOCK_INITIALIZER}}; diff --git a/libnetdata/clocks/clocks.c b/libnetdata/clocks/clocks.c index 161225a9b6..3875b2a255 100644 --- a/libnetdata/clocks/clocks.c +++ b/libnetdata/clocks/clocks.c @@ -3,6 +3,7 @@ #include "../libnetdata.h" static int clock_boottime_valid = 1; +static int clock_monotonic_coarse_valid = 1; #ifndef HAVE_CLOCK_GETTIME inline int clock_gettime(clockid_t clk_id, struct timespec *ts) { @@ -23,6 +24,12 @@ void test_clock_boottime(void) { clock_boottime_valid = 0; } +void test_clock_monotonic_coarse(void) { + struct timespec ts; + if(clock_gettime(CLOCK_MONOTONIC_COARSE, &ts) == -1 && errno == EINVAL) + clock_monotonic_coarse_valid = 0; +} + static inline time_t now_sec(clockid_t clk_id) { struct timespec ts; if(unlikely(clock_gettime(clk_id, &ts) == -1)) { @@ -69,27 +76,31 @@ inline int now_realtime_timeval(struct timeval *tv) { } inline time_t now_monotonic_sec(void) { - return now_sec(CLOCK_MONOTONIC); + return now_sec(likely(clock_monotonic_coarse_valid) ? CLOCK_MONOTONIC_COARSE : CLOCK_MONOTONIC); } inline usec_t now_monotonic_usec(void) { - return now_usec(CLOCK_MONOTONIC); + return now_usec(likely(clock_monotonic_coarse_valid) ? CLOCK_MONOTONIC_COARSE : CLOCK_MONOTONIC); } inline int now_monotonic_timeval(struct timeval *tv) { - return now_timeval(CLOCK_MONOTONIC, tv); + return now_timeval(likely(clock_monotonic_coarse_valid) ? CLOCK_MONOTONIC_COARSE : CLOCK_MONOTONIC, tv); } inline time_t now_boottime_sec(void) { - return now_sec(likely(clock_boottime_valid) ? CLOCK_BOOTTIME : CLOCK_MONOTONIC); + return now_sec(likely(clock_boottime_valid) ? CLOCK_BOOTTIME : + likely(clock_monotonic_coarse_valid) ? CLOCK_MONOTONIC_COARSE : CLOCK_MONOTONIC); } inline usec_t now_boottime_usec(void) { - return now_usec(likely(clock_boottime_valid) ? CLOCK_BOOTTIME : CLOCK_MONOTONIC); + return now_usec(likely(clock_boottime_valid) ? CLOCK_BOOTTIME : + likely(clock_monotonic_coarse_valid) ? CLOCK_MONOTONIC_COARSE : CLOCK_MONOTONIC); } inline int now_boottime_timeval(struct timeval *tv) { - return now_timeval(likely(clock_boottime_valid) ? CLOCK_BOOTTIME : CLOCK_MONOTONIC, tv); + return now_timeval(likely(clock_boottime_valid) ? CLOCK_BOOTTIME : + likely(clock_monotonic_coarse_valid) ? CLOCK_MONOTONIC_COARSE : CLOCK_MONOTONIC, + tv); } inline usec_t timeval_usec(struct timeval *tv) { diff --git a/libnetdata/clocks/clocks.h b/libnetdata/clocks/clocks.h index 4af451d60a..7938cb0d36 100644 --- a/libnetdata/clocks/clocks.h +++ b/libnetdata/clocks/clocks.h @@ -36,6 +36,12 @@ typedef struct heartbeat { #define CLOCK_MONOTONIC CLOCK_REALTIME #endif +/* Prefer CLOCK_MONOTONIC_COARSE where available to reduce overhead. It has the same semantics as CLOCK_MONOTONIC */ +#ifndef CLOCK_MONOTONIC_COARSE +/* fallback to CLOCK_MONOTONIC if not available */ +#define CLOCK_MONOTONIC_COARSE CLOCK_MONOTONIC +#endif + #ifndef CLOCK_BOOTTIME #ifdef CLOCK_UPTIME @@ -43,7 +49,7 @@ typedef struct heartbeat { #define CLOCK_BOOTTIME CLOCK_UPTIME #else // CLOCK_UPTIME /* CLOCK_BOOTTIME falls back to CLOCK_MONOTONIC */ -#define CLOCK_BOOTTIME CLOCK_MONOTONIC +#define CLOCK_BOOTTIME CLOCK_MONOTONIC_COARSE #endif // CLOCK_UPTIME #else // CLOCK_BOOTTIME @@ -136,6 +142,12 @@ extern int sleep_usec(usec_t usec); */ void test_clock_boottime(void); +/* + * When running a binary with CLOCK_MONOTONIC_COARSE defined on a system with a linux kernel older than Linux 2.6.32 the + * clock_gettime(2) system call fails with EINVAL. In that case it must fall-back to CLOCK_MONOTONIC. + */ +void test_clock_monotonic_coarse(void); + extern collected_number uptime_msec(char *filename); #endif /* NETDATA_CLOCKS_H */ diff --git a/libnetdata/config/appconfig.c b/libnetdata/config/appconfig.c index c77cf34ec3..2fb21f1837 100644 --- a/libnetdata/config/appconfig.c +++ b/libnetdata/config/appconfig.c @@ -169,12 +169,13 @@ static inline struct section *appconfig_section_create(struct config *root, cons error("INTERNAL ERROR: indexing of section '%s', already exists.", co->name); appconfig_wrlock(root); - struct section *co2 = root->sections; + struct section *co2 = root->last_section; if(co2) { - while (co2->next) co2 = co2->next; co2->next = co; + } else { + root->first_section = co; } - else root->sections = co; + root->last_section = co; appconfig_unlock(root); return co; @@ -678,7 +679,7 @@ void appconfig_generate(struct config *root, BUFFER *wb, int only_changed) } appconfig_wrlock(root); - for(co = root->sections; co ; co = co->next) { + for(co = root->first_section; co ; co = co->next) { if(!strcmp(co->name, CONFIG_SECTION_GLOBAL) || !strcmp(co->name, CONFIG_SECTION_WEB) || !strcmp(co->name, CONFIG_SECTION_STATSD) diff --git a/libnetdata/config/appconfig.h b/libnetdata/config/appconfig.h index 4688760514..d448205b63 100644 --- a/libnetdata/config/appconfig.h +++ b/libnetdata/config/appconfig.h @@ -141,7 +141,8 @@ struct section { }; struct config { - struct section *sections; + struct section *first_section; + struct section *last_section; // optimize inserting at the end netdata_mutex_t mutex; avl_tree_lock index; }; diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index 79e4e114ba..0afe233531 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -44,7 +44,8 @@ typedef struct { } stream_encoded_t; static struct config stream_config = { - .sections = NULL, + .first_section = NULL, + .last_section = NULL, .mutex = NETDATA_MUTEX_INITIALIZER, .index = { .avl_tree = { @@ -168,7 +169,7 @@ int configured_as_master() { int is_master = 0; appconfig_wrlock(&stream_config); - for (section = stream_config.sections; section; section = section->next) { + for (section = stream_config.first_section; section; section = section->next) { uuid_t uuid; if (uuid_parse(section->name, uuid) != -1 && @@ -1080,35 +1081,6 @@ static void log_stream_connection(const char *client_ip, const char *client_port log_access("STREAM: %d '[%s]:%s' '%s' host '%s' api key '%s' machine guid '%s'", gettid(), client_ip, client_port, msg, host, api_key, machine_guid); } -static RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY get_multiple_connections_strategy(struct config *c, const char *section, const char *name, RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY def) { - char *value; - switch(def) { - default: - case RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW: - value = "allow"; - break; - - case RRDPUSH_MULTIPLE_CONNECTIONS_DENY_NEW: - value = "deny"; - break; - } - - value = appconfig_get(c, section, name, value); - - RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY ret = def; - - if(strcasecmp(value, "allow") == 0 || strcasecmp(value, "permit") == 0 || strcasecmp(value, "accept") == 0) - ret = RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW; - - else if(strcasecmp(value, "deny") == 0 || strcasecmp(value, "reject") == 0 || strcasecmp(value, "block") == 0) - ret = RRDPUSH_MULTIPLE_CONNECTIONS_DENY_NEW; - - else - error("Invalid stream config value at section [%s], setting '%s', value '%s'", section, name, value); - - return ret; -} - static int rrdpush_receive(int fd , const char *key , const char *hostname @@ -1137,7 +1109,6 @@ static int rrdpush_receive(int fd char *rrdpush_api_key = default_rrdpush_api_key; char *rrdpush_send_charts_matching = default_rrdpush_send_charts_matching; time_t alarms_delay = 60; - RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY rrdpush_multiple_connections_strategy = RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW; update_every = (int)appconfig_get_number(&stream_config, machine_guid, "update every", update_every); if(update_every < 0) update_every = 1; @@ -1164,9 +1135,6 @@ static int rrdpush_receive(int fd rrdpush_api_key = appconfig_get(&stream_config, key, "default proxy api key", rrdpush_api_key); rrdpush_api_key = appconfig_get(&stream_config, machine_guid, "proxy api key", rrdpush_api_key); - rrdpush_multiple_connections_strategy = get_multiple_connections_strategy(&stream_config, key, "multiple connections", rrdpush_multiple_connections_strategy); - rrdpush_multiple_connections_strategy = get_multiple_connections_strategy(&stream_config, machine_guid, "multiple connections", rrdpush_multiple_connections_strategy); - rrdpush_send_charts_matching = appconfig_get(&stream_config, key, "default proxy send charts matching", rrdpush_send_charts_matching); rrdpush_send_charts_matching = appconfig_get(&stream_config, machine_guid, "proxy send charts matching", rrdpush_send_charts_matching); @@ -1179,26 +1147,40 @@ static int rrdpush_receive(int fd close(fd); return 1; } - else - host = rrdhost_find_or_create( - hostname - , registry_hostname - , machine_guid - , os - , timezone - , tags - , program_name - , program_version - , update_every - , history - , mode - , (unsigned int)(health_enabled != CONFIG_BOOLEAN_NO) - , (unsigned int)(rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key) - , rrdpush_destination - , rrdpush_api_key - , rrdpush_send_charts_matching - , system_info - ); + + /* + * Quick path for rejecting multiple connections. Don't take any locks so that progress is made. The same + * condition will be checked again below, while holding the global and host writer locks. Any potential false + * positives will not cause harm. Data hazards with host deconstruction will be handled when reference counting + * is implemented. + */ + host = rrdhost_find_by_guid(machine_guid, 0); + if(host && host->connected_senders > 0) { + log_stream_connection(client_ip, client_port, key, host->machine_guid, host->hostname, "REJECTED - ALREADY CONNECTED"); + info("STREAM %s [receive from [%s]:%s]: multiple streaming connections for the same host detected. Rejecting new connection.", host->hostname, client_ip, client_port); + close(fd); + return 0; + } + + host = rrdhost_find_or_create( + hostname + , registry_hostname + , machine_guid + , os + , timezone + , tags + , program_name + , program_version + , update_every + , history + , mode + , (unsigned int)(health_enabled != CONFIG_BOOLEAN_NO) + , (unsigned int)(rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key) + , rrdpush_destination + , rrdpush_api_key + , rrdpush_send_charts_matching + , system_info + ); if(!host) { close(fd); @@ -1279,18 +1261,11 @@ static int rrdpush_receive(int fd rrdhost_wrlock(host); if(host->connected_senders > 0) { - switch(rrdpush_multiple_connections_strategy) { - case RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW: - info("STREAM %s [receive from [%s]:%s]: multiple streaming connections for the same host detected. If multiple netdata are pushing metrics for the same charts, at the same time, the result is unexpected.", host->hostname, client_ip, client_port); - break; - - case RRDPUSH_MULTIPLE_CONNECTIONS_DENY_NEW: - rrdhost_unlock(host); - log_stream_connection(client_ip, client_port, key, host->machine_guid, host->hostname, "REJECTED - ALREADY CONNECTED"); - info("STREAM %s [receive from [%s]:%s]: multiple streaming connections for the same host detected. Rejecting new connection.", host->hostname, client_ip, client_port); - fclose(fp); - return 0; - } + rrdhost_unlock(host); + log_stream_connection(client_ip, client_port, key, host->machine_guid, host->hostname, "REJECTED - ALREADY CONNECTED"); + info("STREAM %s [receive from [%s]:%s]: multiple streaming connections for the same host detected. Rejecting new connection.", host->hostname, client_ip, client_port); + fclose(fp); + return 0; } rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN); diff --git a/streaming/stream.conf b/streaming/stream.conf index 58dbcd8e68..e4d05315d5 100644 --- a/streaming/stream.conf +++ b/streaming/stream.conf @@ -149,13 +149,6 @@ # postpone alarms for a short period after the sender is connected default postpone alarms on connect seconds = 60 - # allow or deny multiple connections for the same host? - # If you are sure all your netdata have their own machine GUID, - # set this to 'allow', since it allows faster reconnects. - # When set to 'deny', new connections for a host will not be - # accepted until an existing connection is cleared. - multiple connections = allow - # need to route metrics differently? set these. # the defaults are the ones at the [stream] section (above) #default proxy enabled = yes | no @@ -204,13 +197,6 @@ # postpone alarms when the sender connects postpone alarms on connect seconds = 60 - # allow or deny multiple connections for the same host? - # If you are sure all your netdata have their own machine GUID, - # set this to 'allow', since it allows faster reconnects. - # When set to 'deny', new connections for a host will not be - # accepted until an existing connection is cleared. - multiple connections = allow - # need to route metrics differently? # the defaults are the ones at the [API KEY] section #proxy enabled = yes | no diff --git a/system/netdata.service.in b/system/netdata.service.in index 3abb16460c..e9edb229cf 100644 --- a/system/netdata.service.in +++ b/system/netdata.service.in @@ -21,7 +21,7 @@ ExecStartPre=/bin/chown -R netdata:netdata @localstatedir_POST@/run/netdata PermissionsStartOnly=true # saving a big db on slow disks may need some time -TimeoutStopSec=60 +TimeoutStopSec=150 # restart netdata if it crashes Restart=on-failure diff --git a/web/api/tests/valid_urls.c b/web/api/tests/valid_urls.c index c6032a5e97..cde982f847 100644 --- a/web/api/tests/valid_urls.c +++ b/web/api/tests/valid_urls.c @@ -182,7 +182,8 @@ WEB_SERVER_MODE web_server_mode = WEB_SERVER_MODE_STATIC_THREADED; char *netdata_configured_web_dir = "UNKNOWN FIXME"; RRDHOST *localhost = NULL; -struct config netdata_config = { .sections = NULL, +struct config netdata_config = { .first_section = NULL, + .last_section = NULL, .mutex = NETDATA_MUTEX_INITIALIZER, .index = { .avl_tree = { .root = NULL, .compar = appconfig_section_compare }, .rwlock = AVL_LOCK_INITIALIZER } }; diff --git a/web/api/tests/web_api.c b/web/api/tests/web_api.c index ef9fea0cf8..4a1c58a824 100644 --- a/web/api/tests/web_api.c +++ b/web/api/tests/web_api.c @@ -184,7 +184,8 @@ WEB_SERVER_MODE web_server_mode = WEB_SERVER_MODE_STATIC_THREADED; char *netdata_configured_web_dir = "UNKNOWN FIXME"; RRDHOST *localhost = NULL; -struct config netdata_config = { .sections = NULL, +struct config netdata_config = { .first_section = NULL, + .last_section = NULL, .mutex = NETDATA_MUTEX_INITIALIZER, .index = { .avl_tree = { .root = NULL, .compar = appconfig_section_compare }, .rwlock = AVL_LOCK_INITIALIZER } }; |