diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2022-07-15 01:58:40 +0300 |
---|---|---|
committer | Costa Tsaousis <costa@netdata.cloud> | 2022-07-15 01:58:40 +0300 |
commit | 5b0e4d267bb3d0b4a7f57b2df0265fc5e042e281 (patch) | |
tree | e16d0246e79520c43aa7e0afe170b68d71c1b131 | |
parent | 3a6dc887d2d237f18dccc0ec3375263996baebe7 (diff) |
final cleanup and renaming of global variables
-rw-r--r-- | database/rrdcontext.c | 32 |
1 files changed, 15 insertions, 17 deletions
diff --git a/database/rrdcontext.c b/database/rrdcontext.c index e47ea1799c..75ce4b9d0f 100644 --- a/database/rrdcontext.c +++ b/database/rrdcontext.c @@ -8,6 +8,10 @@ int rrdcontext_enabled = CONFIG_BOOLEAN_YES; +#define MESSAGES_PER_BUNDLE_TO_SEND_TO_HUB_PER_HOST 5000 +#define FULL_RETENTION_SCAN_DELAY_AFTER_DB_ROTATION_SECS 120 +#define RRDCONTEXT_WORKER_THREAD_HEARTBEAT_SECS 1 + // #define LOG_TRANSITIONS 1 // #define LOG_RRDINSTANCES 1 @@ -1358,7 +1362,7 @@ static void rrdcontext_delete_callback(const char *id, void *value, void *data) rrdcontext_freez(rc); } -static STRING *string_2way_merge(RRDCONTEXT *rc, STRING *a, STRING *b) { +static STRING *string_2way_merge(RRDCONTEXT *rc __maybe_unused, STRING *a, STRING *b) { static STRING *X = NULL, *X2 = NULL; if(unlikely(!X)) { @@ -1402,7 +1406,6 @@ static STRING *string_2way_merge(RRDCONTEXT *rc, STRING *a, STRING *b) { if(strcmp(buf1, string2str(X2)) == 0) return string_dup(a); - internal_error(true, "RRDCONTEXT: '%s' %s() merged '%s' and '%s' as '%s'", string2str(rc->id), __FUNCTION__, string2str(a), string2str(b), buf1); return string_strdupz(buf1); } @@ -1800,8 +1803,10 @@ void rrdcontext_hub_checkpoint_command(void *ptr) { uuid_unparse_lower(*host->node_id, uuid); contexts_snapshot_t bundle = contexts_snapshot_new(host->aclk_state.claimed_id, uuid, our_version_hash); - // calculate version hash and pack all the messages together in one go + // do a deep scan on every metric of the host to make sure all our data are updated rrdcontext_recalculate_host_retention(host, RRD_FLAG_NONE, -1); + + // calculate version hash and pack all the messages together in one go our_version_hash = rrdcontext_version_hash_with_callback(host, rrdcontext_message_send_unsafe, true, bundle); // update the version @@ -1850,7 +1855,6 @@ void rrdcontext_hub_stop_streaming_command(void *ptr) { // ---------------------------------------------------------------------------- // web API - struct rrdcontext_to_json { BUFFER *wb; RRDCONTEXT_TO_JSON_OPTIONS options; @@ -2201,7 +2205,7 @@ void rrdhost_load_rrdcontext_data(RRDHOST *host) { // ---------------------------------------------------------------------------- // the worker thread -static inline usec_t rrdcontext_queued_dispatch_ut(RRDCONTEXT *rc, usec_t now_ut) { +static inline usec_t rrdcontext_calculate_queued_dispatch_time_ut(RRDCONTEXT *rc, usec_t now_ut) { if(likely(rc->queue.delay_calc_ut >= rc->queue.queued_ut)) return rc->queue.scheduled_dispatch_ut; @@ -2228,12 +2232,6 @@ static inline usec_t rrdcontext_queued_dispatch_ut(RRDCONTEXT *rc, usec_t now_ut return dispatch_ut; } -#define MESSAGES_TO_SEND_PER_HOST 1000 - -#define RRDCONTEXT_DELAY_AFTER_DB_ROTATION_SECS 120 -#define RRDCONTEXT_CLEANUP_DELETED_EVERY_SECS 3600 -#define RRDCONTEXT_HEARTBEAT_SECS 1 - #define WORKER_JOB_HOSTS 1 #define WORKER_JOB_CHECK 2 #define WORKER_JOB_SEND 3 @@ -2243,10 +2241,10 @@ static inline usec_t rrdcontext_queued_dispatch_ut(RRDCONTEXT *rc, usec_t now_ut #define WORKER_JOB_CLEANUP 7 #define WORKER_JOB_CLEANUP_DELETE 8 -usec_t rrdcontext_next_db_rotation_ut = 0; - +static usec_t rrdcontext_next_db_rotation_ut = 0; void rrdcontext_db_rotation(void) { - rrdcontext_next_db_rotation_ut = now_realtime_usec() + RRDCONTEXT_DELAY_AFTER_DB_ROTATION_SECS * USEC_PER_SEC; + // called when the db rotates its database + rrdcontext_next_db_rotation_ut = now_realtime_usec() + FULL_RETENTION_SCAN_DELAY_AFTER_DB_ROTATION_SECS * USEC_PER_SEC; } static uint64_t rrdcontext_version_hash_with_callback( @@ -2411,7 +2409,7 @@ void *rrdcontext_main(void *ptr) { netdata_thread_cleanup_push(rrdcontext_main_cleanup, ptr); heartbeat_t hb; heartbeat_init(&hb); - usec_t step = USEC_PER_SEC * RRDCONTEXT_HEARTBEAT_SECS; + usec_t step = USEC_PER_SEC * RRDCONTEXT_WORKER_THREAD_HEARTBEAT_SECS; while (!netdata_exit) { worker_is_idle(); @@ -2446,11 +2444,11 @@ void *rrdcontext_main(void *ptr) { RRDCONTEXT *rc; dfe_start_write((DICTIONARY *)host->rrdctx_queue, rc) { - if(unlikely(messages_added >= MESSAGES_TO_SEND_PER_HOST)) + if(unlikely(messages_added >= MESSAGES_PER_BUNDLE_TO_SEND_TO_HUB_PER_HOST)) break; worker_is_busy(WORKER_JOB_QUEUED); - usec_t dispatch_ut = rrdcontext_queued_dispatch_ut(rc, now_ut); + usec_t dispatch_ut = rrdcontext_calculate_queued_dispatch_time_ut(rc, now_ut); char *claim_id = get_agent_claimid(); if(unlikely(now_ut >= dispatch_ut) && claim_id) { worker_is_busy(WORKER_JOB_CHECK); |