diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2022-11-20 23:47:53 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-11-20 23:47:53 +0200 |
commit | 284f6f3aa4f36cefad2601c490510621496c2b53 (patch) | |
tree | 97a7d55627ef7477f431c53a20d0e6f1f738a419 /daemon | |
parent | 2d02484954f68bf7e3015cb649e2f10a9f3c5c95 (diff) |
streaming compression, query planner and replication fixes (#14023)
* streaming compression, query planner and replication fixes
* remove journal v2 stats from global statistics
* disable sql for checking past sql UUIDs
* single threaded replication
* final replication thread using dictionaries and JudyL for sorting the pending requests
* do not timeout the sending socket when there are pending replication requests
* streaming receiver using read() instead of fread()
* remove FILE * from streaming - now using posix read() and write()
* increase timeouts to 10 minutes
* apply sender timeout only when there are metrics that are supposed to be streamed
* error handling in replication
* remove retries on socket read timeout; better error messages
* take into account inbound traffic too to detect that a connection is stale
* remove race conditions from replication thread
* make sure deleted entries are marked as executed, so that even if deletion fails, they will not be executed
* 2 minutes timeout to retry streaming to a parent that already has this node
* remove unecessary condition check
* fix compilation warnings
* include judy in replication
* wrappers to handle retries for SSL_read and SSL_write
* compressed bytes read monitoring
* recursive locks on replication to make it faster during flush or cleanup
* replication completion chart at the receiver side
* simplified recursive mutex
* simplified recursive mutex again
Diffstat (limited to 'daemon')
-rw-r--r-- | daemon/global_statistics.c | 3 | ||||
-rw-r--r-- | daemon/main.c | 12 | ||||
-rw-r--r-- | daemon/service.c | 11 | ||||
-rw-r--r-- | daemon/static_threads.c | 31 |
4 files changed, 41 insertions, 16 deletions
diff --git a/daemon/global_statistics.c b/daemon/global_statistics.c index e81fc49d11..2cdf1f71f3 100644 --- a/daemon/global_statistics.c +++ b/daemon/global_statistics.c @@ -1848,6 +1848,7 @@ static struct worker_utilization all_workers_utilization[] = { { .name = "TIMEX", .family = "workers plugin timex", .priority = 1000000 }, { .name = "IDLEJITTER", .family = "workers plugin idlejitter", .priority = 1000000 }, { .name = "RRDCONTEXT", .family = "workers contexts", .priority = 1000000 }, + { .name = "REPLICATION", .family = "workers replication sender", .priority = 1000000 }, { .name = "SERVICE", .family = "workers service", .priority = 1000000 }, // has to be terminated with a NULL @@ -2203,7 +2204,7 @@ static void workers_utilization_update_chart(struct worker_utilization *wu) { { size_t i; for (i = 0; i < WORKER_UTILIZATION_MAX_JOB_TYPES; i++) { - if(wu->per_job_type[i].type != WORKER_METRIC_INCREMENTAL) + if(wu->per_job_type[i].type != WORKER_METRIC_INCREMENT && wu->per_job_type[i].type != WORKER_METRIC_INCREMENTAL_TOTAL) continue; if(!wu->per_job_type[i].count_value) diff --git a/daemon/main.c b/daemon/main.c index 5c437d208d..67d24d6977 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -4,6 +4,7 @@ #include "buildinfo.h" #include "static_threads.h" +bool unittest_running = false; int netdata_zero_metrics_enabled; int netdata_anonymous_statistics_enabled; @@ -678,7 +679,7 @@ static void get_netdata_configured_variables() { // ------------------------------------------------------------------------ // get default Database Engine page cache size in MiB - db_engine_use_malloc = config_get_boolean(CONFIG_SECTION_DB, "dbengine page cache with malloc", CONFIG_BOOLEAN_NO); + db_engine_use_malloc = config_get_boolean(CONFIG_SECTION_DB, "dbengine page cache with malloc", CONFIG_BOOLEAN_YES); default_rrdeng_page_cache_mb = (int) config_get_number(CONFIG_SECTION_DB, "dbengine page cache size MB", default_rrdeng_page_cache_mb); if(default_rrdeng_page_cache_mb < RRDENG_MIN_PAGE_CACHE_SIZE_MB) { error("Invalid page cache size %d given. Defaulting to %d.", default_rrdeng_page_cache_mb, RRDENG_MIN_PAGE_CACHE_SIZE_MB); @@ -982,6 +983,8 @@ int main(int argc, char **argv) { } if(strcmp(optarg, "unittest") == 0) { + unittest_running = true; + if (unit_test_static_threads()) return 1; if (unit_test_buffer()) @@ -1028,24 +1031,31 @@ int main(int argc, char **argv) { #endif #ifdef ENABLE_DBENGINE else if(strcmp(optarg, "mctest") == 0) { + unittest_running = true; return mc_unittest(); } else if(strcmp(optarg, "ctxtest") == 0) { + unittest_running = true; return ctx_unittest(); } else if(strcmp(optarg, "dicttest") == 0) { + unittest_running = true; return dictionary_unittest(10000); } else if(strcmp(optarg, "araltest") == 0) { + unittest_running = true; return aral_unittest(10000); } else if(strcmp(optarg, "stringtest") == 0) { + unittest_running = true; return string_unittest(10000); } else if(strcmp(optarg, "rrdlabelstest") == 0) { + unittest_running = true; return rrdlabels_unittest(); } else if(strcmp(optarg, "metatest") == 0) { + unittest_running = true; return metadata_unittest(); } else if(strncmp(optarg, createdataset_string, strlen(createdataset_string)) == 0) { diff --git a/daemon/service.c b/daemon/service.c index be1aad8164..28fdc84756 100644 --- a/daemon/service.c +++ b/daemon/service.c @@ -156,17 +156,20 @@ static void svc_rrdhost_cleanup_obsolete_charts(RRDHOST *host) { static void svc_rrdset_check_obsoletion(RRDHOST *host) { worker_is_busy(WORKER_JOB_CHILD_CHART_OBSOLETION_CHECK); + time_t now = now_realtime_sec(); time_t last_entry_t; RRDSET *st; rrdset_foreach_read(st, host) { + if(!rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED)) + continue; + last_entry_t = rrdset_last_entry_t(st); - if(last_entry_t && last_entry_t < host->senders_connect_time && host->senders_connect_time - + TIME_TO_RUN_OBSOLETIONS_ON_CHILD_CONNECT + ITERATIONS_TO_RUN_OBSOLETIONS_ON_CHILD_CONNECT * st->update_every - < now_realtime_sec()) + if(last_entry_t && last_entry_t < host->senders_connect_time && + host->senders_connect_time + TIME_TO_RUN_OBSOLETIONS_ON_CHILD_CONNECT + ITERATIONS_TO_RUN_OBSOLETIONS_ON_CHILD_CONNECT * st->update_every + < now) rrdset_is_obsolete(st); - } rrdset_foreach_done(st); } diff --git a/daemon/static_threads.c b/daemon/static_threads.c index 707866ee68..9d2f30c355 100644 --- a/daemon/static_threads.c +++ b/daemon/static_threads.c @@ -2,16 +2,17 @@ #include "common.h" -extern void *aclk_main(void *ptr); -extern void *analytics_main(void *ptr); -extern void *checks_main(void *ptr); -extern void *cpuidlejitter_main(void *ptr); -extern void *global_statistics_main(void *ptr); -extern void *health_main(void *ptr); -extern void *pluginsd_main(void *ptr); -extern void *service_main(void *ptr); -extern void *statsd_main(void *ptr); -extern void *timex_main(void *ptr); +void *aclk_main(void *ptr); +void *analytics_main(void *ptr); +void *checks_main(void *ptr); +void *cpuidlejitter_main(void *ptr); +void *global_statistics_main(void *ptr); +void *health_main(void *ptr); +void *pluginsd_main(void *ptr); +void *service_main(void *ptr); +void *statsd_main(void *ptr); +void *timex_main(void *ptr); +void *replication_thread_main(void *ptr __maybe_unused); extern bool global_statistics_enabled; @@ -140,6 +141,16 @@ const struct netdata_static_thread static_threads_common[] = { .start_routine = rrdcontext_main }, + { + .name = "replication", + .config_section = NULL, + .config_name = NULL, + .enabled = 1, + .thread = NULL, + .init_routine = NULL, + .start_routine = replication_thread_main + }, + // terminator { .name = NULL, |