summaryrefslogtreecommitdiffstats
path: root/libnetdata
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-11-20 23:47:53 +0200
committerGitHub <noreply@github.com>2022-11-20 23:47:53 +0200
commit284f6f3aa4f36cefad2601c490510621496c2b53 (patch)
tree97a7d55627ef7477f431c53a20d0e6f1f738a419 /libnetdata
parent2d02484954f68bf7e3015cb649e2f10a9f3c5c95 (diff)
streaming compression, query planner and replication fixes (#14023)
* streaming compression, query planner and replication fixes * remove journal v2 stats from global statistics * disable sql for checking past sql UUIDs * single threaded replication * final replication thread using dictionaries and JudyL for sorting the pending requests * do not timeout the sending socket when there are pending replication requests * streaming receiver using read() instead of fread() * remove FILE * from streaming - now using posix read() and write() * increase timeouts to 10 minutes * apply sender timeout only when there are metrics that are supposed to be streamed * error handling in replication * remove retries on socket read timeout; better error messages * take into account inbound traffic too to detect that a connection is stale * remove race conditions from replication thread * make sure deleted entries are marked as executed, so that even if deletion fails, they will not be executed * 2 minutes timeout to retry streaming to a parent that already has this node * remove unecessary condition check * fix compilation warnings * include judy in replication * wrappers to handle retries for SSL_read and SSL_write * compressed bytes read monitoring * recursive locks on replication to make it faster during flush or cleanup * replication completion chart at the receiver side * simplified recursive mutex * simplified recursive mutex again
Diffstat (limited to 'libnetdata')
-rw-r--r--libnetdata/arrayalloc/arrayalloc.c2
-rw-r--r--libnetdata/dictionary/dictionary.c9
-rw-r--r--libnetdata/dictionary/dictionary.h8
-rw-r--r--libnetdata/ebpf/ebpf.c2
-rw-r--r--libnetdata/inlined.h2
-rw-r--r--libnetdata/socket/socket.c57
-rw-r--r--libnetdata/socket/socket.h2
-rw-r--r--libnetdata/worker_utilization/worker_utilization.c24
-rw-r--r--libnetdata/worker_utilization/worker_utilization.h3
9 files changed, 87 insertions, 22 deletions
diff --git a/libnetdata/arrayalloc/arrayalloc.c b/libnetdata/arrayalloc/arrayalloc.c
index f4d3cb0312..f337279aea 100644
--- a/libnetdata/arrayalloc/arrayalloc.c
+++ b/libnetdata/arrayalloc/arrayalloc.c
@@ -344,7 +344,7 @@ void arrayalloc_freez(ARAL *ar, void *ptr) {
#endif
}
-#ifdef NETDATA_INTERNAL_CHECKS
+#ifdef NETDATA_ARRAYALLOC_INTERNAL_CHECKS
{
// find the page ptr belongs
ARAL_PAGE *page2 = find_page_with_allocation_internal_check(ar, ptr);
diff --git a/libnetdata/dictionary/dictionary.c b/libnetdata/dictionary/dictionary.c
index e362acdd8b..29456200a9 100644
--- a/libnetdata/dictionary/dictionary.c
+++ b/libnetdata/dictionary/dictionary.c
@@ -2171,11 +2171,13 @@ int dictionary_walkthrough_rw(DICTIONARY *dict, char rw, int (*callback)(const D
// ----------------------------------------------------------------------------
// sorted walkthrough
+typedef int (*qsort_compar)(const void *item1, const void *item2);
+
static int dictionary_sort_compar(const void *item1, const void *item2) {
return strcmp(item_get_name((*(DICTIONARY_ITEM **)item1)), item_get_name((*(DICTIONARY_ITEM **)item2)));
}
-int dictionary_sorted_walkthrough_rw(DICTIONARY *dict, char rw, int (*callback)(const DICTIONARY_ITEM *item, void *entry, void *data), void *data) {
+int dictionary_sorted_walkthrough_rw(DICTIONARY *dict, char rw, int (*callback)(const DICTIONARY_ITEM *item, void *entry, void *data), void *data, dictionary_sorted_compar compar) {
if(unlikely(!dict || !callback)) return 0;
if(unlikely(is_dictionary_destroyed(dict))) {
@@ -2200,7 +2202,10 @@ int dictionary_sorted_walkthrough_rw(DICTIONARY *dict, char rw, int (*callback)(
if(unlikely(i != entries))
entries = i;
- qsort(array, entries, sizeof(DICTIONARY_ITEM *), dictionary_sort_compar);
+ if(compar)
+ qsort(array, entries, sizeof(DICTIONARY_ITEM *), (qsort_compar)compar);
+ else
+ qsort(array, entries, sizeof(DICTIONARY_ITEM *), dictionary_sort_compar);
bool callit = true;
int ret = 0, r;
diff --git a/libnetdata/dictionary/dictionary.h b/libnetdata/dictionary/dictionary.h
index d002385cb3..4e2ba751fa 100644
--- a/libnetdata/dictionary/dictionary.h
+++ b/libnetdata/dictionary/dictionary.h
@@ -230,9 +230,11 @@ size_t dictionary_acquired_item_references(DICT_ITEM_CONST DICTIONARY_ITEM *item
#define dictionary_walkthrough_write(dict, callback, data) dictionary_walkthrough_rw(dict, 'w', callback, data)
int dictionary_walkthrough_rw(DICTIONARY *dict, char rw, int (*callback)(const DICTIONARY_ITEM *item, void *value, void *data), void *data);
-#define dictionary_sorted_walkthrough_read(dict, callback, data) dictionary_sorted_walkthrough_rw(dict, 'r', callback, data)
-#define dictionary_sorted_walkthrough_write(dict, callback, data) dictionary_sorted_walkthrough_rw(dict, 'w', callback, data)
-int dictionary_sorted_walkthrough_rw(DICTIONARY *dict, char rw, int (*callback)(const DICTIONARY_ITEM *item, void *entry, void *data), void *data);
+typedef int (*dictionary_sorted_compar)(const DICTIONARY_ITEM **item1, const DICTIONARY_ITEM **item2);
+
+#define dictionary_sorted_walkthrough_read(dict, callback, data) dictionary_sorted_walkthrough_rw(dict, 'r', callback, data, NULL)
+#define dictionary_sorted_walkthrough_write(dict, callback, data) dictionary_sorted_walkthrough_rw(dict, 'w', callback, data, NULL)
+int dictionary_sorted_walkthrough_rw(DICTIONARY *dict, char rw, int (*callback)(const DICTIONARY_ITEM *item, void *entry, void *data), void *data, dictionary_sorted_compar compar);
// ----------------------------------------------------------------------------
// Traverse with foreach
diff --git a/libnetdata/ebpf/ebpf.c b/libnetdata/ebpf/ebpf.c
index abca185ec3..b0f3d6e73f 100644
--- a/libnetdata/ebpf/ebpf.c
+++ b/libnetdata/ebpf/ebpf.c
@@ -471,7 +471,7 @@ void ebpf_update_pid_table(ebpf_local_maps_t *pid, ebpf_module_t *em)
* @param em the structure with information about how the module/thread is working.
* @param map_name the name of the file used to log.
*/
-void ebpf_update_map_size(struct bpf_map *map, ebpf_local_maps_t *lmap, ebpf_module_t *em, const char *map_name)
+void ebpf_update_map_size(struct bpf_map *map, ebpf_local_maps_t *lmap, ebpf_module_t *em, const char *map_name __maybe_unused)
{
uint32_t define_size = 0;
uint32_t apps_type = NETDATA_EBPF_MAP_PID | NETDATA_EBPF_MAP_RESIZABLE;
diff --git a/libnetdata/inlined.h b/libnetdata/inlined.h
index 788a0312b6..ab09e64dec 100644
--- a/libnetdata/inlined.h
+++ b/libnetdata/inlined.h
@@ -45,7 +45,7 @@ static inline uint32_t simple_uhash(const char *name) {
static inline int str2i(const char *s) {
int n = 0;
char c, negative = (char)(*s == '-');
- const char *e = &s[30]; // max number of character to iterate
+ const char *e = s + 30; // max number of character to iterate
for(c = (char)((negative)?*(++s):*s); c >= '0' && c <= '9' && s < e ; c = *(++s)) {
n *= 10;
diff --git a/libnetdata/socket/socket.c b/libnetdata/socket/socket.c
index 4344d982a1..6c288f505d 100644
--- a/libnetdata/socket/socket.c
+++ b/libnetdata/socket/socket.c
@@ -919,6 +919,53 @@ int connect_to_one_of_urls(const char *destination, int default_port, struct tim
}
+#ifdef ENABLE_HTTPS
+ssize_t netdata_ssl_read(SSL *ssl, void *buf, size_t num) {
+ error_limit_static_thread_var(erl, 1, 0);
+
+ int bytes, err, retries = 0;
+
+ do {
+ bytes = SSL_read(ssl, buf, (int)num);
+ err = SSL_get_error(ssl, bytes);
+ retries++;
+ } while (bytes <= 0 && (err == SSL_ERROR_WANT_READ));
+
+ if(unlikely(bytes <= 0))
+ error("SSL_read() returned %d bytes, SSL error %d", bytes, err);
+
+ if(retries > 1)
+ error_limit(&erl, "SSL_read() retried %d times", retries);
+
+ return bytes;
+}
+
+ssize_t netdata_ssl_write(SSL *ssl, const void *buf, size_t num) {
+ error_limit_static_thread_var(erl, 1, 0);
+
+ int bytes, err, retries = 0;
+ size_t total = 0;
+
+ do {
+ bytes = SSL_write(ssl, (uint8_t *)buf + total, (int)(num - total));
+ err = SSL_get_error(ssl, bytes);
+ retries++;
+
+ if(bytes > 0)
+ total += bytes;
+
+ } while ((bytes <= 0 && (err == SSL_ERROR_WANT_WRITE)) || (bytes > 0 && total < num));
+
+ if(unlikely(bytes <= 0))
+ error("SSL_read() returned %d bytes, SSL error %d", bytes, err);
+
+ if(retries > 1)
+ error_limit(&erl, "SSL_read() retried %d times", retries);
+
+ return bytes;
+}
+#endif
+
// --------------------------------------------------------------------------------------------------------------------
// helpers to send/receive data in one call, in blocking mode, with a timeout
@@ -956,12 +1003,10 @@ ssize_t recv_timeout(int sockfd, void *buf, size_t len, int flags, int timeout)
}
#ifdef ENABLE_HTTPS
- if (ssl->conn) {
- if (!ssl->flags) {
- return SSL_read(ssl->conn,buf,len);
- }
- }
+ if (ssl->conn && ssl->flags == NETDATA_SSL_HANDSHAKE_COMPLETE)
+ return netdata_ssl_read(ssl->conn, buf, len);
#endif
+
return recv(sockfd, buf, len, flags);
}
@@ -1001,7 +1046,7 @@ ssize_t send_timeout(int sockfd, void *buf, size_t len, int flags, int timeout)
#ifdef ENABLE_HTTPS
if(ssl->conn) {
if (ssl->flags == NETDATA_SSL_HANDSHAKE_COMPLETE) {
- return SSL_write(ssl->conn, buf, len);
+ return netdata_ssl_write(ssl->conn, buf, len);
}
else {
error("cannot write to SSL connection - connection is not ready.");
diff --git a/libnetdata/socket/socket.h b/libnetdata/socket/socket.h
index 581ca7ccfe..2823242738 100644
--- a/libnetdata/socket/socket.h
+++ b/libnetdata/socket/socket.h
@@ -67,6 +67,8 @@ int connect_to_one_of_urls(const char *destination, int default_port, struct tim
#ifdef ENABLE_HTTPS
ssize_t recv_timeout(struct netdata_ssl *ssl,int sockfd, void *buf, size_t len, int flags, int timeout);
ssize_t send_timeout(struct netdata_ssl *ssl,int sockfd, void *buf, size_t len, int flags, int timeout);
+ssize_t netdata_ssl_read(SSL *ssl, void *buf, size_t num);
+ssize_t netdata_ssl_write(SSL *ssl, const void *buf, size_t num);
#else
ssize_t recv_timeout(int sockfd, void *buf, size_t len, int flags, int timeout);
ssize_t send_timeout(int sockfd, void *buf, size_t len, int flags, int timeout);
diff --git a/libnetdata/worker_utilization/worker_utilization.c b/libnetdata/worker_utilization/worker_utilization.c
index 7df050f386..114c4ad9f8 100644
--- a/libnetdata/worker_utilization/worker_utilization.c
+++ b/libnetdata/worker_utilization/worker_utilization.c
@@ -151,10 +151,17 @@ void worker_set_metric(size_t job_id, NETDATA_DOUBLE value) {
if(unlikely(job_id >= WORKER_UTILIZATION_MAX_JOB_TYPES))
return;
- if(worker->per_job_type[job_id].type == WORKER_METRIC_INCREMENTAL)
- worker->per_job_type[job_id].custom_value += value;
- else
- worker->per_job_type[job_id].custom_value = value;
+ switch(worker->per_job_type[job_id].type) {
+ case WORKER_METRIC_INCREMENT:
+ worker->per_job_type[job_id].custom_value += value;
+ break;
+
+ case WORKER_METRIC_INCREMENTAL_TOTAL:
+ case WORKER_METRIC_ABSOLUTE:
+ default:
+ worker->per_job_type[job_id].custom_value = value;
+ break;
+ }
}
// statistics interface
@@ -200,11 +207,12 @@ void workers_foreach(const char *workname, void (*callback)(
switch(p->per_job_type[i].type) {
default:
- case WORKER_METRIC_EMPTY:
+ case WORKER_METRIC_EMPTY: {
per_job_type_jobs_started[i] = 0;
per_job_type_busy_time[i] = 0;
per_job_custom_values[i] = NAN;
break;
+ }
case WORKER_METRIC_IDLE_BUSY: {
size_t tmp_jobs_started = p->per_job_type[i].worker_jobs_started;
@@ -219,14 +227,16 @@ void workers_foreach(const char *workname, void (*callback)(
break;
}
- case WORKER_METRIC_ABSOLUTE:
+ case WORKER_METRIC_ABSOLUTE: {
per_job_type_jobs_started[i] = 0;
per_job_type_busy_time[i] = 0;
per_job_custom_values[i] = p->per_job_type[i].custom_value;
break;
+ }
- case WORKER_METRIC_INCREMENTAL: {
+ case WORKER_METRIC_INCREMENTAL_TOTAL:
+ case WORKER_METRIC_INCREMENT: {
per_job_type_jobs_started[i] = 0;
per_job_type_busy_time[i] = 0;
diff --git a/libnetdata/worker_utilization/worker_utilization.h b/libnetdata/worker_utilization/worker_utilization.h
index aed893ed2d..954fcdcba7 100644
--- a/libnetdata/worker_utilization/worker_utilization.h
+++ b/libnetdata/worker_utilization/worker_utilization.h
@@ -11,7 +11,8 @@ typedef enum {
WORKER_METRIC_EMPTY = 0,
WORKER_METRIC_IDLE_BUSY = 1,
WORKER_METRIC_ABSOLUTE = 2,
- WORKER_METRIC_INCREMENTAL = 3,
+ WORKER_METRIC_INCREMENT = 3,
+ WORKER_METRIC_INCREMENTAL_TOTAL = 4,
} WORKER_METRIC_TYPE;
void worker_register(const char *workname);