diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2022-11-20 23:47:53 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-11-20 23:47:53 +0200 |
commit | 284f6f3aa4f36cefad2601c490510621496c2b53 (patch) | |
tree | 97a7d55627ef7477f431c53a20d0e6f1f738a419 /libnetdata | |
parent | 2d02484954f68bf7e3015cb649e2f10a9f3c5c95 (diff) |
streaming compression, query planner and replication fixes (#14023)
* streaming compression, query planner and replication fixes
* remove journal v2 stats from global statistics
* disable sql for checking past sql UUIDs
* single threaded replication
* final replication thread using dictionaries and JudyL for sorting the pending requests
* do not timeout the sending socket when there are pending replication requests
* streaming receiver using read() instead of fread()
* remove FILE * from streaming - now using posix read() and write()
* increase timeouts to 10 minutes
* apply sender timeout only when there are metrics that are supposed to be streamed
* error handling in replication
* remove retries on socket read timeout; better error messages
* take into account inbound traffic too to detect that a connection is stale
* remove race conditions from replication thread
* make sure deleted entries are marked as executed, so that even if deletion fails, they will not be executed
* 2 minutes timeout to retry streaming to a parent that already has this node
* remove unecessary condition check
* fix compilation warnings
* include judy in replication
* wrappers to handle retries for SSL_read and SSL_write
* compressed bytes read monitoring
* recursive locks on replication to make it faster during flush or cleanup
* replication completion chart at the receiver side
* simplified recursive mutex
* simplified recursive mutex again
Diffstat (limited to 'libnetdata')
-rw-r--r-- | libnetdata/arrayalloc/arrayalloc.c | 2 | ||||
-rw-r--r-- | libnetdata/dictionary/dictionary.c | 9 | ||||
-rw-r--r-- | libnetdata/dictionary/dictionary.h | 8 | ||||
-rw-r--r-- | libnetdata/ebpf/ebpf.c | 2 | ||||
-rw-r--r-- | libnetdata/inlined.h | 2 | ||||
-rw-r--r-- | libnetdata/socket/socket.c | 57 | ||||
-rw-r--r-- | libnetdata/socket/socket.h | 2 | ||||
-rw-r--r-- | libnetdata/worker_utilization/worker_utilization.c | 24 | ||||
-rw-r--r-- | libnetdata/worker_utilization/worker_utilization.h | 3 |
9 files changed, 87 insertions, 22 deletions
diff --git a/libnetdata/arrayalloc/arrayalloc.c b/libnetdata/arrayalloc/arrayalloc.c index f4d3cb0312..f337279aea 100644 --- a/libnetdata/arrayalloc/arrayalloc.c +++ b/libnetdata/arrayalloc/arrayalloc.c @@ -344,7 +344,7 @@ void arrayalloc_freez(ARAL *ar, void *ptr) { #endif } -#ifdef NETDATA_INTERNAL_CHECKS +#ifdef NETDATA_ARRAYALLOC_INTERNAL_CHECKS { // find the page ptr belongs ARAL_PAGE *page2 = find_page_with_allocation_internal_check(ar, ptr); diff --git a/libnetdata/dictionary/dictionary.c b/libnetdata/dictionary/dictionary.c index e362acdd8b..29456200a9 100644 --- a/libnetdata/dictionary/dictionary.c +++ b/libnetdata/dictionary/dictionary.c @@ -2171,11 +2171,13 @@ int dictionary_walkthrough_rw(DICTIONARY *dict, char rw, int (*callback)(const D // ---------------------------------------------------------------------------- // sorted walkthrough +typedef int (*qsort_compar)(const void *item1, const void *item2); + static int dictionary_sort_compar(const void *item1, const void *item2) { return strcmp(item_get_name((*(DICTIONARY_ITEM **)item1)), item_get_name((*(DICTIONARY_ITEM **)item2))); } -int dictionary_sorted_walkthrough_rw(DICTIONARY *dict, char rw, int (*callback)(const DICTIONARY_ITEM *item, void *entry, void *data), void *data) { +int dictionary_sorted_walkthrough_rw(DICTIONARY *dict, char rw, int (*callback)(const DICTIONARY_ITEM *item, void *entry, void *data), void *data, dictionary_sorted_compar compar) { if(unlikely(!dict || !callback)) return 0; if(unlikely(is_dictionary_destroyed(dict))) { @@ -2200,7 +2202,10 @@ int dictionary_sorted_walkthrough_rw(DICTIONARY *dict, char rw, int (*callback)( if(unlikely(i != entries)) entries = i; - qsort(array, entries, sizeof(DICTIONARY_ITEM *), dictionary_sort_compar); + if(compar) + qsort(array, entries, sizeof(DICTIONARY_ITEM *), (qsort_compar)compar); + else + qsort(array, entries, sizeof(DICTIONARY_ITEM *), dictionary_sort_compar); bool callit = true; int ret = 0, r; diff --git a/libnetdata/dictionary/dictionary.h b/libnetdata/dictionary/dictionary.h index d002385cb3..4e2ba751fa 100644 --- a/libnetdata/dictionary/dictionary.h +++ b/libnetdata/dictionary/dictionary.h @@ -230,9 +230,11 @@ size_t dictionary_acquired_item_references(DICT_ITEM_CONST DICTIONARY_ITEM *item #define dictionary_walkthrough_write(dict, callback, data) dictionary_walkthrough_rw(dict, 'w', callback, data) int dictionary_walkthrough_rw(DICTIONARY *dict, char rw, int (*callback)(const DICTIONARY_ITEM *item, void *value, void *data), void *data); -#define dictionary_sorted_walkthrough_read(dict, callback, data) dictionary_sorted_walkthrough_rw(dict, 'r', callback, data) -#define dictionary_sorted_walkthrough_write(dict, callback, data) dictionary_sorted_walkthrough_rw(dict, 'w', callback, data) -int dictionary_sorted_walkthrough_rw(DICTIONARY *dict, char rw, int (*callback)(const DICTIONARY_ITEM *item, void *entry, void *data), void *data); +typedef int (*dictionary_sorted_compar)(const DICTIONARY_ITEM **item1, const DICTIONARY_ITEM **item2); + +#define dictionary_sorted_walkthrough_read(dict, callback, data) dictionary_sorted_walkthrough_rw(dict, 'r', callback, data, NULL) +#define dictionary_sorted_walkthrough_write(dict, callback, data) dictionary_sorted_walkthrough_rw(dict, 'w', callback, data, NULL) +int dictionary_sorted_walkthrough_rw(DICTIONARY *dict, char rw, int (*callback)(const DICTIONARY_ITEM *item, void *entry, void *data), void *data, dictionary_sorted_compar compar); // ---------------------------------------------------------------------------- // Traverse with foreach diff --git a/libnetdata/ebpf/ebpf.c b/libnetdata/ebpf/ebpf.c index abca185ec3..b0f3d6e73f 100644 --- a/libnetdata/ebpf/ebpf.c +++ b/libnetdata/ebpf/ebpf.c @@ -471,7 +471,7 @@ void ebpf_update_pid_table(ebpf_local_maps_t *pid, ebpf_module_t *em) * @param em the structure with information about how the module/thread is working. * @param map_name the name of the file used to log. */ -void ebpf_update_map_size(struct bpf_map *map, ebpf_local_maps_t *lmap, ebpf_module_t *em, const char *map_name) +void ebpf_update_map_size(struct bpf_map *map, ebpf_local_maps_t *lmap, ebpf_module_t *em, const char *map_name __maybe_unused) { uint32_t define_size = 0; uint32_t apps_type = NETDATA_EBPF_MAP_PID | NETDATA_EBPF_MAP_RESIZABLE; diff --git a/libnetdata/inlined.h b/libnetdata/inlined.h index 788a0312b6..ab09e64dec 100644 --- a/libnetdata/inlined.h +++ b/libnetdata/inlined.h @@ -45,7 +45,7 @@ static inline uint32_t simple_uhash(const char *name) { static inline int str2i(const char *s) { int n = 0; char c, negative = (char)(*s == '-'); - const char *e = &s[30]; // max number of character to iterate + const char *e = s + 30; // max number of character to iterate for(c = (char)((negative)?*(++s):*s); c >= '0' && c <= '9' && s < e ; c = *(++s)) { n *= 10; diff --git a/libnetdata/socket/socket.c b/libnetdata/socket/socket.c index 4344d982a1..6c288f505d 100644 --- a/libnetdata/socket/socket.c +++ b/libnetdata/socket/socket.c @@ -919,6 +919,53 @@ int connect_to_one_of_urls(const char *destination, int default_port, struct tim } +#ifdef ENABLE_HTTPS +ssize_t netdata_ssl_read(SSL *ssl, void *buf, size_t num) { + error_limit_static_thread_var(erl, 1, 0); + + int bytes, err, retries = 0; + + do { + bytes = SSL_read(ssl, buf, (int)num); + err = SSL_get_error(ssl, bytes); + retries++; + } while (bytes <= 0 && (err == SSL_ERROR_WANT_READ)); + + if(unlikely(bytes <= 0)) + error("SSL_read() returned %d bytes, SSL error %d", bytes, err); + + if(retries > 1) + error_limit(&erl, "SSL_read() retried %d times", retries); + + return bytes; +} + +ssize_t netdata_ssl_write(SSL *ssl, const void *buf, size_t num) { + error_limit_static_thread_var(erl, 1, 0); + + int bytes, err, retries = 0; + size_t total = 0; + + do { + bytes = SSL_write(ssl, (uint8_t *)buf + total, (int)(num - total)); + err = SSL_get_error(ssl, bytes); + retries++; + + if(bytes > 0) + total += bytes; + + } while ((bytes <= 0 && (err == SSL_ERROR_WANT_WRITE)) || (bytes > 0 && total < num)); + + if(unlikely(bytes <= 0)) + error("SSL_read() returned %d bytes, SSL error %d", bytes, err); + + if(retries > 1) + error_limit(&erl, "SSL_read() retried %d times", retries); + + return bytes; +} +#endif + // -------------------------------------------------------------------------------------------------------------------- // helpers to send/receive data in one call, in blocking mode, with a timeout @@ -956,12 +1003,10 @@ ssize_t recv_timeout(int sockfd, void *buf, size_t len, int flags, int timeout) } #ifdef ENABLE_HTTPS - if (ssl->conn) { - if (!ssl->flags) { - return SSL_read(ssl->conn,buf,len); - } - } + if (ssl->conn && ssl->flags == NETDATA_SSL_HANDSHAKE_COMPLETE) + return netdata_ssl_read(ssl->conn, buf, len); #endif + return recv(sockfd, buf, len, flags); } @@ -1001,7 +1046,7 @@ ssize_t send_timeout(int sockfd, void *buf, size_t len, int flags, int timeout) #ifdef ENABLE_HTTPS if(ssl->conn) { if (ssl->flags == NETDATA_SSL_HANDSHAKE_COMPLETE) { - return SSL_write(ssl->conn, buf, len); + return netdata_ssl_write(ssl->conn, buf, len); } else { error("cannot write to SSL connection - connection is not ready."); diff --git a/libnetdata/socket/socket.h b/libnetdata/socket/socket.h index 581ca7ccfe..2823242738 100644 --- a/libnetdata/socket/socket.h +++ b/libnetdata/socket/socket.h @@ -67,6 +67,8 @@ int connect_to_one_of_urls(const char *destination, int default_port, struct tim #ifdef ENABLE_HTTPS ssize_t recv_timeout(struct netdata_ssl *ssl,int sockfd, void *buf, size_t len, int flags, int timeout); ssize_t send_timeout(struct netdata_ssl *ssl,int sockfd, void *buf, size_t len, int flags, int timeout); +ssize_t netdata_ssl_read(SSL *ssl, void *buf, size_t num); +ssize_t netdata_ssl_write(SSL *ssl, const void *buf, size_t num); #else ssize_t recv_timeout(int sockfd, void *buf, size_t len, int flags, int timeout); ssize_t send_timeout(int sockfd, void *buf, size_t len, int flags, int timeout); diff --git a/libnetdata/worker_utilization/worker_utilization.c b/libnetdata/worker_utilization/worker_utilization.c index 7df050f386..114c4ad9f8 100644 --- a/libnetdata/worker_utilization/worker_utilization.c +++ b/libnetdata/worker_utilization/worker_utilization.c @@ -151,10 +151,17 @@ void worker_set_metric(size_t job_id, NETDATA_DOUBLE value) { if(unlikely(job_id >= WORKER_UTILIZATION_MAX_JOB_TYPES)) return; - if(worker->per_job_type[job_id].type == WORKER_METRIC_INCREMENTAL) - worker->per_job_type[job_id].custom_value += value; - else - worker->per_job_type[job_id].custom_value = value; + switch(worker->per_job_type[job_id].type) { + case WORKER_METRIC_INCREMENT: + worker->per_job_type[job_id].custom_value += value; + break; + + case WORKER_METRIC_INCREMENTAL_TOTAL: + case WORKER_METRIC_ABSOLUTE: + default: + worker->per_job_type[job_id].custom_value = value; + break; + } } // statistics interface @@ -200,11 +207,12 @@ void workers_foreach(const char *workname, void (*callback)( switch(p->per_job_type[i].type) { default: - case WORKER_METRIC_EMPTY: + case WORKER_METRIC_EMPTY: { per_job_type_jobs_started[i] = 0; per_job_type_busy_time[i] = 0; per_job_custom_values[i] = NAN; break; + } case WORKER_METRIC_IDLE_BUSY: { size_t tmp_jobs_started = p->per_job_type[i].worker_jobs_started; @@ -219,14 +227,16 @@ void workers_foreach(const char *workname, void (*callback)( break; } - case WORKER_METRIC_ABSOLUTE: + case WORKER_METRIC_ABSOLUTE: { per_job_type_jobs_started[i] = 0; per_job_type_busy_time[i] = 0; per_job_custom_values[i] = p->per_job_type[i].custom_value; break; + } - case WORKER_METRIC_INCREMENTAL: { + case WORKER_METRIC_INCREMENTAL_TOTAL: + case WORKER_METRIC_INCREMENT: { per_job_type_jobs_started[i] = 0; per_job_type_busy_time[i] = 0; diff --git a/libnetdata/worker_utilization/worker_utilization.h b/libnetdata/worker_utilization/worker_utilization.h index aed893ed2d..954fcdcba7 100644 --- a/libnetdata/worker_utilization/worker_utilization.h +++ b/libnetdata/worker_utilization/worker_utilization.h @@ -11,7 +11,8 @@ typedef enum { WORKER_METRIC_EMPTY = 0, WORKER_METRIC_IDLE_BUSY = 1, WORKER_METRIC_ABSOLUTE = 2, - WORKER_METRIC_INCREMENTAL = 3, + WORKER_METRIC_INCREMENT = 3, + WORKER_METRIC_INCREMENTAL_TOTAL = 4, } WORKER_METRIC_TYPE; void worker_register(const char *workname); |