summaryrefslogtreecommitdiffstats
path: root/daemon
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-11-20 23:47:53 +0200
committerGitHub <noreply@github.com>2022-11-20 23:47:53 +0200
commit284f6f3aa4f36cefad2601c490510621496c2b53 (patch)
tree97a7d55627ef7477f431c53a20d0e6f1f738a419 /daemon
parent2d02484954f68bf7e3015cb649e2f10a9f3c5c95 (diff)
streaming compression, query planner and replication fixes (#14023)
* streaming compression, query planner and replication fixes * remove journal v2 stats from global statistics * disable sql for checking past sql UUIDs * single threaded replication * final replication thread using dictionaries and JudyL for sorting the pending requests * do not timeout the sending socket when there are pending replication requests * streaming receiver using read() instead of fread() * remove FILE * from streaming - now using posix read() and write() * increase timeouts to 10 minutes * apply sender timeout only when there are metrics that are supposed to be streamed * error handling in replication * remove retries on socket read timeout; better error messages * take into account inbound traffic too to detect that a connection is stale * remove race conditions from replication thread * make sure deleted entries are marked as executed, so that even if deletion fails, they will not be executed * 2 minutes timeout to retry streaming to a parent that already has this node * remove unecessary condition check * fix compilation warnings * include judy in replication * wrappers to handle retries for SSL_read and SSL_write * compressed bytes read monitoring * recursive locks on replication to make it faster during flush or cleanup * replication completion chart at the receiver side * simplified recursive mutex * simplified recursive mutex again
Diffstat (limited to 'daemon')
-rw-r--r--daemon/global_statistics.c3
-rw-r--r--daemon/main.c12
-rw-r--r--daemon/service.c11
-rw-r--r--daemon/static_threads.c31
4 files changed, 41 insertions, 16 deletions
diff --git a/daemon/global_statistics.c b/daemon/global_statistics.c
index e81fc49d11..2cdf1f71f3 100644
--- a/daemon/global_statistics.c
+++ b/daemon/global_statistics.c
@@ -1848,6 +1848,7 @@ static struct worker_utilization all_workers_utilization[] = {
{ .name = "TIMEX", .family = "workers plugin timex", .priority = 1000000 },
{ .name = "IDLEJITTER", .family = "workers plugin idlejitter", .priority = 1000000 },
{ .name = "RRDCONTEXT", .family = "workers contexts", .priority = 1000000 },
+ { .name = "REPLICATION", .family = "workers replication sender", .priority = 1000000 },
{ .name = "SERVICE", .family = "workers service", .priority = 1000000 },
// has to be terminated with a NULL
@@ -2203,7 +2204,7 @@ static void workers_utilization_update_chart(struct worker_utilization *wu) {
{
size_t i;
for (i = 0; i < WORKER_UTILIZATION_MAX_JOB_TYPES; i++) {
- if(wu->per_job_type[i].type != WORKER_METRIC_INCREMENTAL)
+ if(wu->per_job_type[i].type != WORKER_METRIC_INCREMENT && wu->per_job_type[i].type != WORKER_METRIC_INCREMENTAL_TOTAL)
continue;
if(!wu->per_job_type[i].count_value)
diff --git a/daemon/main.c b/daemon/main.c
index 5c437d208d..67d24d6977 100644
--- a/daemon/main.c
+++ b/daemon/main.c
@@ -4,6 +4,7 @@
#include "buildinfo.h"
#include "static_threads.h"
+bool unittest_running = false;
int netdata_zero_metrics_enabled;
int netdata_anonymous_statistics_enabled;
@@ -678,7 +679,7 @@ static void get_netdata_configured_variables() {
// ------------------------------------------------------------------------
// get default Database Engine page cache size in MiB
- db_engine_use_malloc = config_get_boolean(CONFIG_SECTION_DB, "dbengine page cache with malloc", CONFIG_BOOLEAN_NO);
+ db_engine_use_malloc = config_get_boolean(CONFIG_SECTION_DB, "dbengine page cache with malloc", CONFIG_BOOLEAN_YES);
default_rrdeng_page_cache_mb = (int) config_get_number(CONFIG_SECTION_DB, "dbengine page cache size MB", default_rrdeng_page_cache_mb);
if(default_rrdeng_page_cache_mb < RRDENG_MIN_PAGE_CACHE_SIZE_MB) {
error("Invalid page cache size %d given. Defaulting to %d.", default_rrdeng_page_cache_mb, RRDENG_MIN_PAGE_CACHE_SIZE_MB);
@@ -982,6 +983,8 @@ int main(int argc, char **argv) {
}
if(strcmp(optarg, "unittest") == 0) {
+ unittest_running = true;
+
if (unit_test_static_threads())
return 1;
if (unit_test_buffer())
@@ -1028,24 +1031,31 @@ int main(int argc, char **argv) {
#endif
#ifdef ENABLE_DBENGINE
else if(strcmp(optarg, "mctest") == 0) {
+ unittest_running = true;
return mc_unittest();
}
else if(strcmp(optarg, "ctxtest") == 0) {
+ unittest_running = true;
return ctx_unittest();
}
else if(strcmp(optarg, "dicttest") == 0) {
+ unittest_running = true;
return dictionary_unittest(10000);
}
else if(strcmp(optarg, "araltest") == 0) {
+ unittest_running = true;
return aral_unittest(10000);
}
else if(strcmp(optarg, "stringtest") == 0) {
+ unittest_running = true;
return string_unittest(10000);
}
else if(strcmp(optarg, "rrdlabelstest") == 0) {
+ unittest_running = true;
return rrdlabels_unittest();
}
else if(strcmp(optarg, "metatest") == 0) {
+ unittest_running = true;
return metadata_unittest();
}
else if(strncmp(optarg, createdataset_string, strlen(createdataset_string)) == 0) {
diff --git a/daemon/service.c b/daemon/service.c
index be1aad8164..28fdc84756 100644
--- a/daemon/service.c
+++ b/daemon/service.c
@@ -156,17 +156,20 @@ static void svc_rrdhost_cleanup_obsolete_charts(RRDHOST *host) {
static void svc_rrdset_check_obsoletion(RRDHOST *host) {
worker_is_busy(WORKER_JOB_CHILD_CHART_OBSOLETION_CHECK);
+ time_t now = now_realtime_sec();
time_t last_entry_t;
RRDSET *st;
rrdset_foreach_read(st, host) {
+ if(!rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED))
+ continue;
+
last_entry_t = rrdset_last_entry_t(st);
- if(last_entry_t && last_entry_t < host->senders_connect_time && host->senders_connect_time
- + TIME_TO_RUN_OBSOLETIONS_ON_CHILD_CONNECT + ITERATIONS_TO_RUN_OBSOLETIONS_ON_CHILD_CONNECT * st->update_every
- < now_realtime_sec())
+ if(last_entry_t && last_entry_t < host->senders_connect_time &&
+ host->senders_connect_time + TIME_TO_RUN_OBSOLETIONS_ON_CHILD_CONNECT + ITERATIONS_TO_RUN_OBSOLETIONS_ON_CHILD_CONNECT * st->update_every
+ < now)
rrdset_is_obsolete(st);
-
}
rrdset_foreach_done(st);
}
diff --git a/daemon/static_threads.c b/daemon/static_threads.c
index 707866ee68..9d2f30c355 100644
--- a/daemon/static_threads.c
+++ b/daemon/static_threads.c
@@ -2,16 +2,17 @@
#include "common.h"
-extern void *aclk_main(void *ptr);
-extern void *analytics_main(void *ptr);
-extern void *checks_main(void *ptr);
-extern void *cpuidlejitter_main(void *ptr);
-extern void *global_statistics_main(void *ptr);
-extern void *health_main(void *ptr);
-extern void *pluginsd_main(void *ptr);
-extern void *service_main(void *ptr);
-extern void *statsd_main(void *ptr);
-extern void *timex_main(void *ptr);
+void *aclk_main(void *ptr);
+void *analytics_main(void *ptr);
+void *checks_main(void *ptr);
+void *cpuidlejitter_main(void *ptr);
+void *global_statistics_main(void *ptr);
+void *health_main(void *ptr);
+void *pluginsd_main(void *ptr);
+void *service_main(void *ptr);
+void *statsd_main(void *ptr);
+void *timex_main(void *ptr);
+void *replication_thread_main(void *ptr __maybe_unused);
extern bool global_statistics_enabled;
@@ -140,6 +141,16 @@ const struct netdata_static_thread static_threads_common[] = {
.start_routine = rrdcontext_main
},
+ {
+ .name = "replication",
+ .config_section = NULL,
+ .config_name = NULL,
+ .enabled = 1,
+ .thread = NULL,
+ .init_routine = NULL,
+ .start_routine = replication_thread_main
+ },
+
// terminator
{
.name = NULL,