From 58c79fd329df7d2187e4aee56fb4a58a9c02c3ae Mon Sep 17 00:00:00 2001 From: Costa Tsaousis Date: Tue, 6 Sep 2022 19:02:39 +0300 Subject: Faster rrdcontext (#13629) * moved rrdcontexts processing to worker thread * added loggings * check for aclk deeper in the code * removed unessesary logs * code re-organization; cleanup; more comments; better error handling; rrdcontext locks optimization; more clarity * updated 2 comments * make instances walkthrough reentrant; move context lock to the place is really needed * created macro for reentrant dictionary walkthrough * incremental updates on instances and metrics * renamed family of rrdcontext workers * prevent crash in case RRDINSTANCE or RRDMETRIC is freed during shutdown * prevent crash during rrddim save, on out of memory fatal() * always post-process contexts * added tracing for tracking the caller that trigger updates * more details on tracing info * fix for charts that are collected without metrics --- database/rrd.h | 3 +- database/rrdcontext.c | 1996 ++++++++++++++++++++++++++----------------------- database/rrddim.c | 8 +- database/rrdvar.c | 2 +- 4 files changed, 1084 insertions(+), 925 deletions(-) (limited to 'database') diff --git a/database/rrd.h b/database/rrd.h index 36c4e5d58f..4c3b13dfb1 100644 --- a/database/rrd.h +++ b/database/rrd.h @@ -916,7 +916,8 @@ struct rrdhost { STORAGE_INSTANCE *storage_instance[RRD_STORAGE_TIERS]; // the database instances of the storage tiers - RRDCONTEXTS *rrdctx_queue; + RRDCONTEXTS *rrdctx_hub_queue; + RRDCONTEXTS *rrdctx_post_processing_queue; RRDCONTEXTS *rrdctx; uuid_t host_uuid; // Global GUID for this host diff --git a/database/rrdcontext.c b/database/rrdcontext.c index b3af8b44cc..dcf8b94e22 100644 --- a/database/rrdcontext.c +++ b/database/rrdcontext.c @@ -10,10 +10,23 @@ int rrdcontext_enabled = CONFIG_BOOLEAN_YES; #define MESSAGES_PER_BUNDLE_TO_SEND_TO_HUB_PER_HOST 5000 #define FULL_RETENTION_SCAN_DELAY_AFTER_DB_ROTATION_SECS 120 -#define RRDCONTEXT_WORKER_THREAD_HEARTBEAT_SECS 1 +#define RRDCONTEXT_WORKER_THREAD_HEARTBEAT_USEC (1000 * USEC_PER_MS) #define RRDCONTEXT_MINIMUM_ALLOWED_PRIORITY 10 -// #define LOG_TRANSITIONS 1 +#define WORKER_JOB_HOSTS 1 +#define WORKER_JOB_CHECK 2 +#define WORKER_JOB_SEND 3 +#define WORKER_JOB_DEQUEUE 4 +#define WORKER_JOB_RETENTION 5 +#define WORKER_JOB_QUEUED 6 +#define WORKER_JOB_CLEANUP 7 +#define WORKER_JOB_CLEANUP_DELETE 8 +#define WORKER_JOB_PP_METRIC 9 // post-processing metrics +#define WORKER_JOB_PP_INSTANCE 10 // post-processing instances +#define WORKER_JOB_PP_CONTEXT 11 // post-processing contexts +#define WORKER_JOB_HUB_QUEUE_SIZE 12 +#define WORKER_JOB_PP_QUEUE_SIZE 13 + typedef enum { RRD_FLAG_NONE = 0, @@ -23,10 +36,11 @@ typedef enum { RRD_FLAG_ARCHIVED = (1 << 3), // this object is not currently being collected RRD_FLAG_OWN_LABELS = (1 << 4), // this instance has its own labels - not linked to an RRDSET RRD_FLAG_LIVE_RETENTION = (1 << 5), // we have got live retention from the database - RRD_FLAG_QUEUED = (1 << 6), // this context is currently queued to be dispatched to hub - RRD_FLAG_DONT_PROCESS = (1 << 7), // don't process updates for this object + RRD_FLAG_QUEUED_FOR_HUB = (1 << 6), // this context is currently queued to be dispatched to hub + RRD_FLAG_QUEUED_FOR_POST_PROCESSING = (1 << 7), // this context is currently queued to be post-processed RRD_FLAG_HIDDEN = (1 << 8), // don't expose this to the hub or the API + RRD_FLAG_UPDATE_REASON_TRIGGERED = (1 << 9), // the update was triggered by the child object RRD_FLAG_UPDATE_REASON_LOAD_SQL = (1 << 10), // this object has just been loaded from SQL RRD_FLAG_UPDATE_REASON_NEW_OBJECT = (1 << 11), // this object has just been created RRD_FLAG_UPDATE_REASON_UPDATED_OBJECT = (1 << 12), // we received an update on this object @@ -54,7 +68,8 @@ typedef enum { } RRD_FLAGS; #define RRD_FLAG_ALL_UPDATE_REASONS ( \ - RRD_FLAG_UPDATE_REASON_LOAD_SQL \ + RRD_FLAG_UPDATE_REASON_TRIGGERED \ + |RRD_FLAG_UPDATE_REASON_LOAD_SQL \ |RRD_FLAG_UPDATE_REASON_NEW_OBJECT \ |RRD_FLAG_UPDATE_REASON_UPDATED_OBJECT \ |RRD_FLAG_UPDATE_REASON_CHANGED_LINKING \ @@ -79,51 +94,110 @@ typedef enum { #define RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS ( \ RRD_FLAG_ARCHIVED \ - |RRD_FLAG_DONT_PROCESS \ |RRD_FLAG_HIDDEN \ |RRD_FLAG_ALL_UPDATE_REASONS \ ) +#define RRD_FLAGS_REQUIRED_FOR_DELETIONS ( \ + RRD_FLAG_DELETED \ + |RRD_FLAG_LIVE_RETENTION \ +) + #define RRD_FLAGS_PREVENTING_DELETIONS ( \ - RRD_FLAG_QUEUED \ + RRD_FLAG_QUEUED_FOR_HUB \ |RRD_FLAG_COLLECTED \ + \ + /* RRD_FLAG_QUEUED_FOR_POST_PROCESSING */ \ + /* should not be here or nothing will be deleted */ \ ) -#define rrd_flag_set_updated(obj, reason) (obj)->flags |= (RRD_FLAG_UPDATED | (reason)) -#define rrd_flag_unset_updated(obj) (obj)->flags &= ~(RRD_FLAG_UPDATED | RRD_FLAG_ALL_UPDATE_REASONS) - -#define rrd_flag_set_collected(obj) do { \ - if(likely( !((obj)->flags & RRD_FLAG_COLLECTED))) \ - (obj)->flags |= (RRD_FLAG_COLLECTED | RRD_FLAG_UPDATE_REASON_STARTED_BEING_COLLECTED | RRD_FLAG_UPDATED); \ - if(likely( ((obj)->flags & (RRD_FLAG_ARCHIVED | RRD_FLAG_UPDATE_REASON_STOPPED_BEING_COLLECTED)))) \ - (obj)->flags &= ~(RRD_FLAG_ARCHIVED | RRD_FLAG_UPDATE_REASON_STOPPED_BEING_COLLECTED); \ - if(unlikely(((obj)->flags & (RRD_FLAG_DELETED | RRD_FLAG_UPDATE_REASON_ZERO_RETENTION)))) \ - (obj)->flags &= ~(RRD_FLAG_DELETED | RRD_FLAG_UPDATE_REASON_ZERO_RETENTION); \ - if(unlikely(((obj)->flags & RRD_FLAG_DONT_PROCESS))) \ - (obj)->flags &= ~RRD_FLAG_DONT_PROCESS; \ -} while(0) - -#define rrd_flag_set_archived(obj) do { \ - if(likely( !((obj)->flags & RRD_FLAG_ARCHIVED))) \ - (obj)->flags |= (RRD_FLAG_ARCHIVED | RRD_FLAG_UPDATE_REASON_STOPPED_BEING_COLLECTED | RRD_FLAG_UPDATED); \ - if(likely( ((obj)->flags & (RRD_FLAG_COLLECTED | RRD_FLAG_UPDATE_REASON_STARTED_BEING_COLLECTED)))) \ - (obj)->flags &= ~(RRD_FLAG_COLLECTED | RRD_FLAG_UPDATE_REASON_STARTED_BEING_COLLECTED); \ - if(unlikely(((obj)->flags & (RRD_FLAG_DELETED | RRD_FLAG_UPDATE_REASON_ZERO_RETENTION)))) \ - (obj)->flags &= ~(RRD_FLAG_DELETED | RRD_FLAG_UPDATE_REASON_ZERO_RETENTION); \ -} while(0) - -#define rrd_flag_set_deleted(obj, reason) do { \ - if(likely( !((obj)->flags & RRD_FLAG_DELETED))) \ - (obj)->flags |= (RRD_FLAG_DELETED | RRD_FLAG_UPDATE_REASON_ZERO_RETENTION | RRD_FLAG_UPDATED | (reason)); \ - if(unlikely(((obj)->flags & RRD_FLAG_ARCHIVED))) \ - (obj)->flags &= ~RRD_FLAG_ARCHIVED; \ - if(likely( ((obj)->flags & RRD_FLAG_COLLECTED))) \ - (obj)->flags &= ~RRD_FLAG_COLLECTED; \ -} while(0) - - -#define rrd_flag_is_collected(obj) ((obj)->flags & RRD_FLAG_COLLECTED) -#define rrd_flag_is_archived(obj) ((obj)->flags & RRD_FLAG_ARCHIVED) +// get all the flags of an object +#define rrd_flags_get(obj) __atomic_load_n(&((obj)->flags), __ATOMIC_SEQ_CST) + +// check if ANY of the given flags (bits) is set +#define rrd_flag_check(obj, flag) (rrd_flags_get(obj) & (flag)) + +// check if ALL of the given flags (bits) are set +#define rrd_flag_check_all(obj, flag) (rrd_flag_check(obj, flag) == (flag)) + +// set one or more flags (bits) +#define rrd_flag_set(obj, flag) __atomic_or_fetch(&((obj)->flags), flag, __ATOMIC_SEQ_CST) + +// clear one or more flags (bits) +#define rrd_flag_clear(obj, flag) __atomic_and_fetch(&((obj)->flags), ~(flag), __ATOMIC_SEQ_CST) + +// replace the flags of an object, with the supplied ones +#define rrd_flags_replace(obj, all_flags) __atomic_store_n(&((obj)->flags), all_flags, __ATOMIC_SEQ_CST) + +static inline void +rrd_flag_add_remove_atomic(RRD_FLAGS *flags, RRD_FLAGS check, RRD_FLAGS conditionally_add, RRD_FLAGS always_remove) { + RRD_FLAGS expected, desired; + do { + expected = *flags; + + desired = expected; + desired &= ~(always_remove); + + if(!(expected & check)) + desired |= (check | conditionally_add); + + } while(!__atomic_compare_exchange_n(flags, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)); +} + +#define rrd_flag_set_collected(obj) \ + rrd_flag_add_remove_atomic(&((obj)->flags) \ + /* check this flag */ \ + , RRD_FLAG_COLLECTED \ + \ + /* add these flags together with the above, if the above is not already set */ \ + , RRD_FLAG_UPDATE_REASON_STARTED_BEING_COLLECTED | RRD_FLAG_UPDATED \ + \ + /* always remove these flags */ \ + , RRD_FLAG_ARCHIVED \ + | RRD_FLAG_DELETED \ + | RRD_FLAG_UPDATE_REASON_STOPPED_BEING_COLLECTED \ + | RRD_FLAG_UPDATE_REASON_ZERO_RETENTION \ + ) + +#define rrd_flag_set_archived(obj) \ + rrd_flag_add_remove_atomic(&((obj)->flags) \ + /* check this flag */ \ + , RRD_FLAG_ARCHIVED \ + \ + /* add these flags together with the above, if the above is not already set */ \ + , RRD_FLAG_UPDATE_REASON_STOPPED_BEING_COLLECTED | RRD_FLAG_UPDATED \ + \ + /* always remove these flags */ \ + , RRD_FLAG_COLLECTED \ + | RRD_FLAG_DELETED \ + | RRD_FLAG_UPDATE_REASON_STARTED_BEING_COLLECTED \ + | RRD_FLAG_UPDATE_REASON_ZERO_RETENTION \ + ) + +#define rrd_flag_set_deleted(obj, reason) \ + rrd_flag_add_remove_atomic(&((obj)->flags) \ + /* check this flag */ \ + , RRD_FLAG_DELETED \ + \ + /* add these flags together with the above, if the above is not already set */ \ + , RRD_FLAG_UPDATE_REASON_ZERO_RETENTION | RRD_FLAG_UPDATED | (reason) \ + \ + /* always remove these flags */ \ + , RRD_FLAG_ARCHIVED \ + | RRD_FLAG_DELETED \ + ) + +#define rrd_flag_is_collected(obj) rrd_flag_check(obj, RRD_FLAG_COLLECTED) +#define rrd_flag_is_archived(obj) rrd_flag_check(obj, RRD_FLAG_ARCHIVED) +#define rrd_flag_is_deleted(obj) rrd_flag_check(obj, RRD_FLAG_DELETED) +#define rrd_flag_is_updated(obj) rrd_flag_check(obj, RRD_FLAG_UPDATED) + +// mark an object as updated, providing reasons (additional bits) +#define rrd_flag_set_updated(obj, reason) rrd_flag_set(obj, RRD_FLAG_UPDATED | (reason)) + +// clear an object as being updated, clearing also all the reasons +#define rrd_flag_unset_updated(obj) rrd_flag_clear(obj, RRD_FLAG_UPDATED | RRD_FLAG_ALL_UPDATE_REASONS) + static struct rrdcontext_reason { RRD_FLAGS flag; @@ -131,6 +205,7 @@ static struct rrdcontext_reason { usec_t delay_ut; } rrdcontext_reasons[] = { // context related + { RRD_FLAG_UPDATE_REASON_TRIGGERED, "triggered transition", 60 * USEC_PER_SEC }, { RRD_FLAG_UPDATE_REASON_NEW_OBJECT, "object created", 60 * USEC_PER_SEC }, { RRD_FLAG_UPDATE_REASON_UPDATED_OBJECT, "object updated", 60 * USEC_PER_SEC }, { RRD_FLAG_UPDATE_REASON_LOAD_SQL, "loaded from sql", 60 * USEC_PER_SEC }, @@ -173,8 +248,6 @@ typedef struct rrdmetric { RRD_FLAGS flags; struct rrdinstance *ri; - - usec_t created_ut; // the time this object was created } RRDMETRIC; typedef struct rrdinstance { @@ -199,6 +272,12 @@ typedef struct rrdinstance { struct rrdcontext *rc; DICTIONARY *rrdmetrics; + + struct { + uint32_t collected_metrics; // a temporary variable to detect BEGIN/END without SET + // don't use it for other purposes + // it goes up and then resets to zero, on every iteration + } internal; } RRDINSTANCE; typedef struct rrdcontext { @@ -287,31 +366,38 @@ static inline void rrdcontext_release(RRDCONTEXT_ACQUIRED *rca) { dictionary_acquired_item_release((DICTIONARY *)rc->rrdhost->rrdctx, (DICTIONARY_ITEM *)rca); } -static void rrdcontext_recalculate_context_retention(RRDCONTEXT *rc, RRD_FLAGS reason, int job_id); -static void rrdcontext_recalculate_host_retention(RRDHOST *host, RRD_FLAGS reason, int job_id); +static void rrdcontext_recalculate_context_retention(RRDCONTEXT *rc, RRD_FLAGS reason, bool worker_jobs); +static void rrdcontext_recalculate_host_retention(RRDHOST *host, RRD_FLAGS reason, bool worker_jobs); #define rrdcontext_version_hash(host) rrdcontext_version_hash_with_callback(host, NULL, false, NULL) static uint64_t rrdcontext_version_hash_with_callback(RRDHOST *host, void (*callback)(RRDCONTEXT *, bool, void *), bool snapshot, void *bundle); -static void rrdcontext_garbage_collect_single_host(RRDHOST *host, bool worker); -static void rrdcontext_garbage_collect(void); +static void rrdcontext_garbage_collect_single_host(RRDHOST *host, bool worker_jobs); +static void rrdcontext_garbage_collect_for_all_hosts(void); void rrdcontext_delete_from_sql_unsafe(RRDCONTEXT *rc); #define rrdcontext_lock(rc) netdata_mutex_lock(&((rc)->mutex)) #define rrdcontext_unlock(rc) netdata_mutex_unlock(&((rc)->mutex)) // ---------------------------------------------------------------------------- -// Updates triggers +// Forward definitions + +static uint64_t rrdcontext_get_next_version(RRDCONTEXT *rc); +static bool check_if_cloud_version_changed_unsafe(RRDCONTEXT *rc, bool sending __maybe_unused); +static void rrdcontext_message_send_unsafe(RRDCONTEXT *rc, bool snapshot __maybe_unused, void *bundle __maybe_unused); -static void rrdmetric_trigger_updates(RRDMETRIC *rm, bool force, bool escalate, const char *function); -static void rrdinstance_trigger_updates(RRDINSTANCE *ri, bool force, bool escalate, const char *function); -static void rrdcontext_trigger_updates(RRDCONTEXT *rc, bool force, const char *function); +static void rrdcontext_queue_for_post_processing(RRDCONTEXT *rc, const char *function, RRD_FLAGS flags); +static void rrdcontext_post_process_updates(RRDCONTEXT *rc, bool force, RRD_FLAGS reason, bool worker_jobs); + +static void rrdmetric_trigger_updates(RRDMETRIC *rm, const char *function); +static void rrdinstance_trigger_updates(RRDINSTANCE *ri, const char *function); +static void rrdcontext_trigger_updates(RRDCONTEXT *rc, const char *function); // ---------------------------------------------------------------------------- // visualizing flags static void rrd_flags_to_buffer(RRD_FLAGS flags, BUFFER *wb) { - if(flags & RRD_FLAG_QUEUED) + if(flags & RRD_FLAG_QUEUED_FOR_HUB) buffer_strcat(wb, "QUEUED "); if(flags & RRD_FLAG_DELETED) @@ -332,11 +418,11 @@ static void rrd_flags_to_buffer(RRD_FLAGS flags, BUFFER *wb) { if(flags & RRD_FLAG_LIVE_RETENTION) buffer_strcat(wb, "LIVE_RETENTION "); - if(flags & RRD_FLAG_DONT_PROCESS) - buffer_strcat(wb, "DONT_PROCESS "); - if(flags & RRD_FLAG_HIDDEN) buffer_strcat(wb, "HIDDEN "); + + if(flags & RRD_FLAG_QUEUED_FOR_POST_PROCESSING) + buffer_strcat(wb, "PENDING_UPDATES "); } static void rrd_reasons_to_buffer(RRD_FLAGS flags, BUFFER *wb) { @@ -350,55 +436,11 @@ static void rrd_reasons_to_buffer(RRD_FLAGS flags, BUFFER *wb) { } } -// ---------------------------------------------------------------------------- -// logging of all data collected - -#ifdef LOG_TRANSITIONS -static void log_transition(RRDMETRIC *rm, RRDINSTANCE *ri, RRDCONTEXT *rc, const char *function) { - BUFFER *wb = buffer_create(1000); - const char *triggered_on = "triggered on "; - - buffer_sprintf(wb, "RRD TRANSITION: %s() ", function); - - if(rm) { - buffer_sprintf(wb, "%smetric '%s' of ", triggered_on, string2str(rm->id)); - triggered_on = ""; - } - - if(ri) { - buffer_sprintf(wb, "%sinstance '%s' of ", triggered_on, string2str(ri->id)); - triggered_on = ""; - } - - buffer_sprintf(wb, "%scontext '%s' ", triggered_on, string2str(rc->id)); - - RRD_FLAGS flags = rc->flags; - const char *we_are = "context"; - if(ri) { - flags = ri->flags; - we_are = "instance"; - } - if(rm) { - flags = rm->flags; - we_are = "metric"; - } - - buffer_sprintf(wb, "%s flags: ", we_are); - rrd_flags_to_buffer(flags, wb); - - buffer_strcat(wb, ", having reasons: "); - rrd_reasons_to_buffer(flags, wb); - - internal_error(true, "%s", buffer_tostring(wb)); - buffer_free(wb); -} -#else -#define log_transition(rm, ri, rc, function) debug_dummy() -#endif - // ---------------------------------------------------------------------------- // RRDMETRIC +// free the contents of RRDMETRIC. +// RRDMETRIC itself is managed by DICTIONARY - no need to free it here. static void rrdmetric_free(RRDMETRIC *rm) { string_freez(rm->id); string_freez(rm->name); @@ -408,60 +450,8 @@ static void rrdmetric_free(RRDMETRIC *rm) { rm->ri = NULL; } -static void rrdmetric_update_retention(RRDMETRIC *rm) { - time_t min_first_time_t = LONG_MAX, max_last_time_t = 0; - - if(rm->rrddim) { - min_first_time_t = rrddim_first_entry_t(rm->rrddim); - max_last_time_t = rrddim_last_entry_t(rm->rrddim); - } -#ifdef ENABLE_DBENGINE - else { - RRDHOST *rrdhost = rm->ri->rc->rrdhost; - for (int tier = 0; tier < storage_tiers; tier++) { - if(!rrdhost->storage_instance[tier]) continue; - - time_t first_time_t, last_time_t; - if (rrdeng_metric_retention_by_uuid(rrdhost->storage_instance[tier], &rm->uuid, &first_time_t, &last_time_t) == 0) { - if (first_time_t < min_first_time_t) - min_first_time_t = first_time_t; - - if (last_time_t > max_last_time_t) - max_last_time_t = last_time_t; - } - } - } -#endif - - if(min_first_time_t == LONG_MAX) - min_first_time_t = 0; - - if(min_first_time_t > max_last_time_t) { - internal_error(true, "RRDMETRIC: retention of '%s' is flipped", string2str(rm->id)); - time_t tmp = min_first_time_t; - min_first_time_t = max_last_time_t; - max_last_time_t = tmp; - } - - // check if retention changed - - if (min_first_time_t != rm->first_time_t) { - rm->first_time_t = min_first_time_t; - rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T); - } - - if (max_last_time_t != rm->last_time_t) { - rm->last_time_t = max_last_time_t; - rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T); - } - - if(unlikely(!rm->first_time_t && !rm->last_time_t)) - rrd_flag_set_deleted(rm, RRD_FLAG_UPDATE_REASON_ZERO_RETENTION); - - rm->flags |= RRD_FLAG_LIVE_RETENTION; -} - // called when this rrdmetric is inserted to the rrdmetrics dictionary of a rrdinstance +// the constructor of the rrdmetric object static void rrdmetric_insert_callback(const char *id __maybe_unused, void *value, void *data) { RRDMETRIC *rm = value; @@ -469,15 +459,14 @@ static void rrdmetric_insert_callback(const char *id __maybe_unused, void *value rm->ri = data; // remove flags that we need to figure out at runtime - rm->flags = rm->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS; - - rm->created_ut = now_realtime_usec(); + rm->flags = rm->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS; // no need for atomics // signal the react callback to do the job rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_NEW_OBJECT); } // called when this rrdmetric is deleted from the rrdmetrics dictionary of a rrdinstance +// the destructor of the rrdmetric object static void rrdmetric_delete_callback(const char *id __maybe_unused, void *value, void *data __maybe_unused) { RRDMETRIC *rm = value; @@ -488,6 +477,7 @@ static void rrdmetric_delete_callback(const char *id __maybe_unused, void *value } // called when the same rrdmetric is inserted again to the rrdmetrics dictionary of a rrdinstance +// while this is called, the dictionary is write locked, but there may be other users of the object static void rrdmetric_conflict_callback(const char *id __maybe_unused, void *oldv, void *newv, void *data __maybe_unused) { RRDMETRIC *rm = oldv; RRDMETRIC *rm_new = newv; @@ -537,26 +527,27 @@ static void rrdmetric_conflict_callback(const char *id __maybe_unused, void *old rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T); } - rm->flags |= (rm_new->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS); + rrd_flag_set(rm, rm_new->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS); // no needs for atomics on rm_new if(rrd_flag_is_collected(rm) && rrd_flag_is_archived(rm)) rrd_flag_set_collected(rm); - if(rm->flags & RRD_FLAG_UPDATED) - rm->flags |= RRD_FLAG_UPDATE_REASON_UPDATED_OBJECT; + if(rrd_flag_check(rm, RRD_FLAG_UPDATED)) + rrd_flag_set(rm, RRD_FLAG_UPDATE_REASON_UPDATED_OBJECT); rrdmetric_free(rm_new); // the react callback will continue from here } +// this is called after the insert or the conflict callbacks, +// but the dictionary is now unlocked static void rrdmetric_react_callback(const char *id __maybe_unused, void *value, void *data __maybe_unused) { RRDMETRIC *rm = value; - - rrdmetric_trigger_updates(rm, false, true, __FUNCTION__); + rrdmetric_trigger_updates(rm, __FUNCTION__ ); } -static void rrdmetrics_create(RRDINSTANCE *ri) { +static void rrdmetrics_create_in_rrdinstance(RRDINSTANCE *ri) { if(unlikely(!ri)) return; if(likely(ri->rrdmetrics)) return; @@ -567,51 +558,26 @@ static void rrdmetrics_create(RRDINSTANCE *ri) { dictionary_register_react_callback(ri->rrdmetrics, rrdmetric_react_callback, (void *)ri); } -static void rrdmetrics_destroy(RRDINSTANCE *ri) { +static void rrdmetrics_destroy_from_rrdinstance(RRDINSTANCE *ri) { if(unlikely(!ri || !ri->rrdmetrics)) return; dictionary_destroy(ri->rrdmetrics); ri->rrdmetrics = NULL; } -static inline bool rrdmetric_should_be_deleted(RRDMETRIC *rm) { - if(likely(!(rm->flags & RRD_FLAG_DELETED))) - return false; - - if(likely(!(rm->flags & RRD_FLAG_LIVE_RETENTION))) - return false; - - if(unlikely(rm->flags & RRD_FLAGS_PREVENTING_DELETIONS)) - return false; - - if(likely(rm->rrddim)) - return false; - - //if((now_realtime_usec() - rm->created_ut) < 600 * USEC_PER_SEC) - // return false; - - rrdmetric_update_retention(rm); - if(rm->first_time_t || rm->last_time_t) - return false; - - return true; -} - -static void rrdmetric_trigger_updates(RRDMETRIC *rm, bool force, bool escalate, const char *function __maybe_unused) { - if(likely(!force && !(rm->flags & RRD_FLAG_UPDATED))) return; - - // logs and statistics - log_transition(rm, rm->ri, rm->ri->rc, function); - rrdcontext_triggered_update_on_rrdmetric(); - - if(unlikely(rrd_flag_is_collected(rm)) && (!rm->rrddim || rm->flags & RRD_FLAG_UPDATE_REASON_DISCONNECTED_CHILD)) +// trigger post-processing of the rrdmetric, escalating changes to the rrdinstance it belongs +static void rrdmetric_trigger_updates(RRDMETRIC *rm, const char *function) { + if(unlikely(rrd_flag_is_collected(rm)) && (!rm->rrddim || rrd_flag_check(rm, RRD_FLAG_UPDATE_REASON_DISCONNECTED_CHILD))) rrd_flag_set_archived(rm); - rrdmetric_update_retention(rm); - - if(unlikely(escalate && rm->flags & RRD_FLAG_UPDATED)) - rrdinstance_trigger_updates(rm->ri, true, true, __FUNCTION__); + if(rrd_flag_is_updated(rm) || !rrd_flag_check(rm, RRD_FLAG_LIVE_RETENTION)) { + rrd_flag_set_updated(rm->ri, RRD_FLAG_UPDATE_REASON_TRIGGERED); + rrdcontext_queue_for_post_processing(rm->ri->rc, function, rm->flags); + } } +// ---------------------------------------------------------------------------- +// RRDMETRIC HOOKS ON RRDDIM + static inline void rrdmetric_from_rrddim(RRDDIM *rd) { if(unlikely(!rd->rrdset)) fatal("RRDMETRIC: rrddim '%s' does not have a rrdset.", rrddim_id(rd)); @@ -627,7 +593,7 @@ static inline void rrdmetric_from_rrddim(RRDDIM *rd) { RRDMETRIC trm = { .id = string_dup(rd->id), .name = string_dup(rd->name), - .flags = RRD_FLAG_NONE, + .flags = RRD_FLAG_NONE, // no need for atomics .rrddim = rd, }; uuid_copy(trm.uuid, rd->metric_uuid); @@ -648,6 +614,10 @@ static inline RRDMETRIC *rrddim_get_rrdmetric_with_trace(RRDDIM *rd, const char } RRDMETRIC *rm = rrdmetric_acquired_value(rd->rrdmetric); + if(unlikely(!rm)) { + error("RRDMETRIC: RRDDIM '%s' lost the link to its RRDMETRIC at %s()", rrddim_id(rd), function); + return NULL; + } if(unlikely(rm->rrddim != rd)) fatal("RRDMETRIC: '%s' is not linked to RRDDIM '%s' at %s()", string2str(rm->id), rrddim_id(rd), function); @@ -663,7 +633,7 @@ static inline void rrdmetric_rrddim_is_freed(RRDDIM *rd) { rrd_flag_set_archived(rm); rm->rrddim = NULL; - rrdmetric_trigger_updates(rm, false, true, __FUNCTION__); + rrdmetric_trigger_updates(rm, __FUNCTION__ ); rrdmetric_release(rd->rrdmetric); rd->rrdmetric = NULL; } @@ -677,7 +647,7 @@ static inline void rrdmetric_updated_rrddim_flags(RRDDIM *rd) { rrd_flag_set_archived(rm); } - rrdmetric_trigger_updates(rm, false, true, __FUNCTION__); + rrdmetric_trigger_updates(rm, __FUNCTION__ ); } static inline void rrdmetric_collected_rrddim(RRDDIM *rd) { @@ -687,7 +657,10 @@ static inline void rrdmetric_collected_rrddim(RRDDIM *rd) { if(unlikely(!rrd_flag_is_collected(rm))) rrd_flag_set_collected(rm); - rrdmetric_trigger_updates(rm, false, true, __FUNCTION__); + // we use this variable to detect BEGIN/END without SET + rm->ri->internal.collected_metrics++; + + rrdmetric_trigger_updates(rm, __FUNCTION__ ); } // ---------------------------------------------------------------------------- @@ -695,10 +668,10 @@ static inline void rrdmetric_collected_rrddim(RRDDIM *rd) { static void rrdinstance_free(RRDINSTANCE *ri) { - if(ri->flags & RRD_FLAG_OWN_LABELS) + if(rrd_flag_check(ri, RRD_FLAG_OWN_LABELS)) dictionary_destroy(ri->rrdlabels); - rrdmetrics_destroy(ri); + rrdmetrics_destroy_from_rrdinstance(ri); string_freez(ri->id); string_freez(ri->name); string_freez(ri->title); @@ -727,33 +700,32 @@ static void rrdinstance_insert_callback(const char *id __maybe_unused, void *val // link it to its parent ri->rc = data; - ri->flags = ri->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS; + ri->flags = ri->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS; // no need for atomics if(!ri->name) ri->name = string_dup(ri->id); if(ri->rrdset && ri->rrdset->state) { ri->rrdlabels = ri->rrdset->state->chart_labels; - if(ri->flags & RRD_FLAG_OWN_LABELS) - ri->flags &= ~RRD_FLAG_OWN_LABELS; + ri->flags &= ~RRD_FLAG_OWN_LABELS; // no need of atomics at the constructor } else { ri->rrdlabels = rrdlabels_create(); - ri->flags |= RRD_FLAG_OWN_LABELS; + ri->flags |= RRD_FLAG_OWN_LABELS; // no need of atomics at the constructor } if(ri->rrdset) { if(unlikely((rrdset_flag_check(ri->rrdset, RRDSET_FLAG_HIDDEN)) || (ri->rrdset->state && ri->rrdset->state->is_ar_chart))) - ri->flags |= RRD_FLAG_HIDDEN; + ri->flags |= RRD_FLAG_HIDDEN; // no need of atomics at the constructor else - ri->flags &= ~RRD_FLAG_HIDDEN; + ri->flags &= ~RRD_FLAG_HIDDEN; // no need of atomics at the constructor } // we need this when loading from SQL if(unlikely(ri->id == ml_anomaly_rates_id)) - ri->flags |= RRD_FLAG_HIDDEN; + ri->flags |= RRD_FLAG_HIDDEN; // no need of atomics at the constructor - rrdmetrics_create(ri); + rrdmetrics_create_in_rrdinstance(ri); // signal the react callback to do the job rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_NEW_OBJECT); @@ -840,32 +812,32 @@ static void rrdinstance_conflict_callback(const char *id __maybe_unused, void *o if(ri->rrdset != ri_new->rrdset) { ri->rrdset = ri_new->rrdset; - if(ri->rrdset && (ri->flags & RRD_FLAG_OWN_LABELS)) { + if(ri->rrdset && rrd_flag_check(ri, RRD_FLAG_OWN_LABELS)) { DICTIONARY *old = ri->rrdlabels; ri->rrdlabels = ri->rrdset->state->chart_labels; - ri->flags &= ~RRD_FLAG_OWN_LABELS; + rrd_flag_clear(ri, RRD_FLAG_OWN_LABELS); rrdlabels_destroy(old); } - else if(!ri->rrdset && !(ri->flags & RRD_FLAG_OWN_LABELS)) { + else if(!ri->rrdset && !rrd_flag_check(ri, RRD_FLAG_OWN_LABELS)) { ri->rrdlabels = rrdlabels_create(); - ri->flags |= RRD_FLAG_OWN_LABELS; + rrd_flag_set(ri, RRD_FLAG_OWN_LABELS); } } if(ri->rrdset) { if(unlikely((rrdset_flag_check(ri->rrdset, RRDSET_FLAG_HIDDEN)) || (ri->rrdset->state && ri->rrdset->state->is_ar_chart))) - ri->flags |= RRD_FLAG_HIDDEN; + rrd_flag_set(ri, RRD_FLAG_HIDDEN); else - ri->flags &= ~RRD_FLAG_HIDDEN; + rrd_flag_clear(ri, RRD_FLAG_HIDDEN); } - ri->flags |= (ri_new->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS); + rrd_flag_set(ri, ri_new->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS); // no need for atomics on ri_new if(rrd_flag_is_collected(ri) && rrd_flag_is_archived(ri)) rrd_flag_set_collected(ri); - if(ri->flags & RRD_FLAG_UPDATED) - ri->flags |= RRD_FLAG_UPDATE_REASON_UPDATED_OBJECT; + if(rrd_flag_is_updated(ri)) + rrd_flag_set(ri, RRD_FLAG_UPDATE_REASON_UPDATED_OBJECT); // free the new one rrdinstance_free(ri_new); @@ -876,10 +848,10 @@ static void rrdinstance_conflict_callback(const char *id __maybe_unused, void *o static void rrdinstance_react_callback(const char *id __maybe_unused, void *value, void *data __maybe_unused) { RRDINSTANCE *ri = value; - rrdinstance_trigger_updates(ri, false, true, __FUNCTION__); + rrdinstance_trigger_updates(ri, __FUNCTION__ ); } -void rrdinstances_create(RRDCONTEXT *rc) { +void rrdinstances_create_in_rrdcontext(RRDCONTEXT *rc) { if(unlikely(rrdcontext_enabled == CONFIG_BOOLEAN_NO)) return; @@ -892,156 +864,42 @@ void rrdinstances_create(RRDCONTEXT *rc) { dictionary_register_react_callback(rc->rrdinstances, rrdinstance_react_callback, (void *)rc); } -void rrdinstances_destroy(RRDCONTEXT *rc) { +void rrdinstances_destroy_from_rrdcontext(RRDCONTEXT *rc) { if(unlikely(!rc || !rc->rrdinstances)) return; dictionary_destroy(rc->rrdinstances); rc->rrdinstances = NULL; } -static inline bool rrdinstance_should_be_deleted(RRDINSTANCE *ri) { - if(likely(!(ri->flags & RRD_FLAG_DELETED))) - return false; - - if(likely(!(ri->flags & RRD_FLAG_LIVE_RETENTION))) - return false; - - if(unlikely(ri->flags & RRD_FLAGS_PREVENTING_DELETIONS)) - return false; - - if(likely(ri->rrdset)) - return false; - - if(unlikely(dictionary_stats_referenced_items(ri->rrdmetrics) != 0)) - return false; - - if(unlikely(dictionary_stats_entries(ri->rrdmetrics) != 0)) - return false; - - if(ri->first_time_t || ri->last_time_t) - return false; - - return true; -} - -static void rrdinstance_trigger_updates(RRDINSTANCE *ri, bool force, bool escalate, const char *function __maybe_unused) { - if(unlikely(ri->flags & RRD_FLAG_DONT_PROCESS)) return; - if(unlikely(!force && !(ri->flags & RRD_FLAG_UPDATED))) return; - - // logs and stats - log_transition(NULL, ri, ri->rc, function); - rrdcontext_triggered_update_on_rrdinstance(); +static void rrdinstance_trigger_updates(RRDINSTANCE *ri, const char *function) { + RRDSET *st = ri->rrdset; - if(likely(ri->rrdset)) { - if(unlikely(ri->rrdset->priority != ri->priority)) { - ri->priority = ri->rrdset->priority; + if(likely(st)) { + if(unlikely(st->priority != ri->priority)) { + ri->priority = st->priority; rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_PRIORITY); } - if(unlikely(ri->rrdset->update_every != ri->update_every)) { - ri->update_every = ri->rrdset->update_every; + if(unlikely(st->update_every != ri->update_every)) { + ri->update_every = st->update_every; rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_UPDATE_EVERY); } } else if(unlikely(rrd_flag_is_collected(ri))) { + // there is no rrdset, but we have it as collected! + rrd_flag_set_archived(ri); rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_LINKING); } - time_t min_first_time_t = LONG_MAX, max_last_time_t = 0; - size_t metrics_active = 0, metrics_deleted = 0; - bool live_retention = true, currently_collected = false; - if(dictionary_stats_entries(ri->rrdmetrics) > 0) { - RRDMETRIC *rm; - dfe_start_read((DICTIONARY *)ri->rrdmetrics, rm) { - if(unlikely(!(rm->flags & RRD_FLAG_LIVE_RETENTION))) - live_retention = false; - - if (unlikely((rrdmetric_should_be_deleted(rm)))) { - metrics_deleted++; - rrd_flag_unset_updated(rm); - continue; - } - - if(rm->flags & RRD_FLAG_COLLECTED && rm->first_time_t) - currently_collected = true; - - metrics_active++; - - if (rm->first_time_t && rm->first_time_t < min_first_time_t) - min_first_time_t = rm->first_time_t; - - if (rm->last_time_t && rm->last_time_t > max_last_time_t) - max_last_time_t = rm->last_time_t; - - rrd_flag_unset_updated(rm); - } - dfe_done(rm); - } - - if(unlikely(live_retention && !(ri->flags & RRD_FLAG_LIVE_RETENTION))) - ri->flags |= RRD_FLAG_LIVE_RETENTION; - else if(unlikely(!live_retention && (ri->flags & RRD_FLAG_LIVE_RETENTION))) - ri->flags &= ~RRD_FLAG_LIVE_RETENTION; - - if(unlikely(!metrics_active)) { - // no metrics available - - if(ri->first_time_t) { - ri->first_time_t = 0; - rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T); - } - - if(ri->last_time_t) { - ri->last_time_t = 0; - rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T); - } - - rrd_flag_set_deleted(ri, RRD_FLAG_UPDATE_REASON_ZERO_RETENTION); - } - else { - // we have active metrics... - - if (unlikely(min_first_time_t == LONG_MAX)) - min_first_time_t = 0; - - if (unlikely(min_first_time_t == 0 || max_last_time_t == 0)) { - if(ri->first_time_t) { - ri->first_time_t = 0; - rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T); - } - - if(ri->last_time_t) { - ri->last_time_t = 0; - rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T); - } - - if(likely(live_retention)) - rrd_flag_set_deleted(ri, RRD_FLAG_UPDATE_REASON_ZERO_RETENTION); - } - else { - ri->flags &= ~RRD_FLAG_UPDATE_REASON_ZERO_RETENTION; - - if (unlikely(ri->first_time_t != min_first_time_t)) { - ri->first_time_t = min_first_time_t; - rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T); - } - - if (unlikely(ri->last_time_t != max_last_time_t)) { - ri->last_time_t = max_last_time_t; - rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T); - } - - if(likely(currently_collected)) - rrd_flag_set_collected(ri); - else - rrd_flag_set_archived(ri); - } + if(rrd_flag_is_updated(ri) || !rrd_flag_check(ri, RRD_FLAG_LIVE_RETENTION)) { + rrd_flag_set_updated(ri->rc, RRD_FLAG_UPDATE_REASON_TRIGGERED); + rrdcontext_queue_for_post_processing(ri->rc, function, ri->flags); } - - if(unlikely(escalate && ri->flags & RRD_FLAG_UPDATED)) - rrdcontext_trigger_updates(ri->rc, true, __FUNCTION__); } +// ---------------------------------------------------------------------------- +// RRDINSTANCE HOOKS ON RRDSET + static inline void rrdinstance_from_rrdset(RRDSET *st) { RRDCONTEXT trc = { .id = string_dup(st->context), @@ -1050,7 +908,7 @@ static inline void rrdinstance_from_rrdset(RRDSET *st) { .family = string_dup(st->family), .priority = st->priority, .chart_type = st->chart_type, - .flags = RRD_FLAG_NONE, + .flags = RRD_FLAG_NONE, // no need for atomics .rrdhost = st->rrdhost, }; @@ -1066,7 +924,7 @@ static inline void rrdinstance_from_rrdset(RRDSET *st) { .chart_type = st->chart_type, .priority = st->priority, .update_every = st->update_every, - .flags = RRD_FLAG_DONT_PROCESS, + .flags = RRD_FLAG_NONE, // no need for atomics .rrdset = st, }; uuid_copy(tri.uuid, *st->chart_uuid); @@ -1090,8 +948,9 @@ static inline void rrdinstance_from_rrdset(RRDSET *st) { } if(rca_old && ria_old) { - // the chart changed context - RRDCONTEXT *rc_old = rrdcontext_acquired_value(rca_old); + // Ooops! The chart changed context! + + // RRDCONTEXT *rc_old = rrdcontext_acquired_value(rca_old); RRDINSTANCE *ri_old = rrdinstance_acquired_value(ria_old); // migrate all dimensions to the new metrics @@ -1101,7 +960,7 @@ static inline void rrdinstance_from_rrdset(RRDSET *st) { if (!rd->rrdmetric) continue; RRDMETRIC *rm_old = rrdmetric_acquired_value(rd->rrdmetric); - rm_old->flags = RRD_FLAG_DELETED|RRD_FLAG_UPDATED|RRD_FLAG_LIVE_RETENTION|RRD_FLAG_UPDATE_REASON_UNUSED|RRD_FLAG_UPDATE_REASON_ZERO_RETENTION; + rrd_flags_replace(rm_old, RRD_FLAG_DELETED|RRD_FLAG_UPDATED|RRD_FLAG_LIVE_RETENTION|RRD_FLAG_UPDATE_REASON_UNUSED|RRD_FLAG_UPDATE_REASON_ZERO_RETENTION); rm_old->rrddim = NULL; rm_old->first_time_t = 0; rm_old->last_time_t = 0; @@ -1114,20 +973,15 @@ static inline void rrdinstance_from_rrdset(RRDSET *st) { rrdset_unlock(st); // mark the old instance, ready to be deleted - if(!(ri_old->flags & RRD_FLAG_OWN_LABELS)) + if(!rrd_flag_check(ri_old, RRD_FLAG_OWN_LABELS)) ri_old->rrdlabels = rrdlabels_create(); - ri_old->flags = RRD_FLAG_OWN_LABELS|RRD_FLAG_DELETED|RRD_FLAG_UPDATED|RRD_FLAG_LIVE_RETENTION|RRD_FLAG_UPDATE_REASON_UNUSED|RRD_FLAG_UPDATE_REASON_ZERO_RETENTION; + rrd_flags_replace(ri_old, RRD_FLAG_OWN_LABELS|RRD_FLAG_DELETED|RRD_FLAG_UPDATED|RRD_FLAG_LIVE_RETENTION|RRD_FLAG_UPDATE_REASON_UNUSED|RRD_FLAG_UPDATE_REASON_ZERO_RETENTION); ri_old->rrdset = NULL; ri_old->first_time_t = 0; ri_old->last_time_t = 0; - ri_old->flags &= ~RRD_FLAG_DONT_PROCESS; - rc_old->flags &= ~RRD_FLAG_DONT_PROCESS; - - rrdinstance_trigger_updates(ri_old, true, true, __FUNCTION__); - - ri_old->flags |= RRD_FLAG_DONT_PROCESS; + rrdinstance_trigger_updates(ri_old, __FUNCTION__ ); rrdinstance_release(ria_old); /* @@ -1138,10 +992,10 @@ static inline void rrdinstance_from_rrdset(RRDSET *st) { rc_old->first_time_t = 0; rc_old->last_time_t = 0; rrdcontext_unlock(rc_old); - rrdcontext_trigger_updates(rc_old, true, __FUNCTION__); + rrdcontext_trigger_updates(rc_old, __FUNCTION__ ); } else - rrdcontext_trigger_updates(rc_old, true, __FUNCTION__); + rrdcontext_trigger_updates(rc_old, __FUNCTION__ ); */ rrdcontext_release(rca_old); @@ -1161,6 +1015,10 @@ static inline RRDINSTANCE *rrdset_get_rrdinstance_with_trace(RRDSET *st, const c } RRDINSTANCE *ri = rrdinstance_acquired_value(st->rrdinstance); + if(unlikely(!ri)) { + error("RRDINSTANCE: RRDSET '%s' lost its link to an RRDINSTANCE at %s()", rrdset_id(st), function); + return NULL; + } if(unlikely(ri->rrdset != st)) fatal("RRDINSTANCE: '%s' is not linked to RRDSET '%s' at %s()", string2str(ri->id), rrdset_id(st), function); @@ -1174,17 +1032,15 @@ static inline void rrdinstance_rrdset_is_freed(RRDSET *st) { rrd_flag_set_archived(ri); - if(!(ri->flags & RRD_FLAG_OWN_LABELS)) { - ri->flags |= RRD_FLAG_OWN_LABELS; + if(!rrd_flag_check(ri, RRD_FLAG_OWN_LABELS)) { ri->rrdlabels = rrdlabels_create(); rrdlabels_copy(ri->rrdlabels, st->state->chart_labels); + rrd_flag_set(ri, RRD_FLAG_OWN_LABELS); } ri->rrdset = NULL; - ri->flags &= ~RRD_FLAG_DONT_PROCESS; - rrdinstance_trigger_updates(ri, false, true, __FUNCTION__); - ri->flags |= RRD_FLAG_DONT_PROCESS; + rrdinstance_trigger_updates(ri, __FUNCTION__ ); rrdinstance_release(st->rrdinstance); st->rrdinstance = NULL; @@ -1206,7 +1062,7 @@ static inline void rrdinstance_updated_rrdset_name(RRDSET *st) { string_freez(old); rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_NAME); - rrdinstance_trigger_updates(ri, false, true, __FUNCTION__); + rrdinstance_trigger_updates(ri, __FUNCTION__ ); } } @@ -1215,13 +1071,17 @@ static inline void rrdinstance_updated_rrdset_flags_no_action(RRDINSTANCE *ri, R fatal("RRDCONTEXT: instance '%s' is not linked to chart '%s' on host '%s'", string2str(ri->id), rrdset_id(st), rrdhost_hostname(st->rrdhost)); - if(unlikely((rrdset_flag_check(st, RRDSET_FLAG_HIDDEN)) && !(ri->flags & RRD_FLAG_HIDDEN))) { - ri->flags |= RRD_FLAG_HIDDEN; - rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_FLAGS); - } - else if(unlikely(!(rrdset_flag_check(st, RRDSET_FLAG_HIDDEN)) && (ri->flags & RRD_FLAG_HIDDEN))) { - ri->flags &= ~RRD_FLAG_HIDDEN; - rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_FLAGS); + bool st_is_hidden = rrdset_flag_check(st, RRDSET_FLAG_HIDDEN); + bool ri_is_hidden = rrd_flag_check(ri, RRD_FLAG_HIDDEN); + + if(unlikely(st_is_hidden != ri_is_hidden)) { + if (unlikely(st_is_hidden && !ri_is_hidden)) + rrd_flag_set_updated(ri, RRD_FLAG_HIDDEN | RRD_FLAG_UPDATE_REASON_CHANGED_FLAGS); + + else if (unlikely(!st_is_hidden && ri_is_hidden)) { + rrd_flag_clear(ri, RRD_FLAG_HIDDEN); + rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_FLAGS); + } } } @@ -1234,9 +1094,7 @@ static inline void rrdinstance_updated_rrdset_flags(RRDSET *st) { rrdinstance_updated_rrdset_flags_no_action(ri, st); - ri->flags &= ~RRD_FLAG_DONT_PROCESS; - rrdinstance_trigger_updates(ri, false, true, __FUNCTION__); - ri->flags |= RRD_FLAG_DONT_PROCESS; + rrdinstance_trigger_updates(ri, __FUNCTION__ ); } static inline void rrdinstance_collected_rrdset(RRDSET *st) { @@ -1245,16 +1103,13 @@ static inline void rrdinstance_collected_rrdset(RRDSET *st) { rrdinstance_updated_rrdset_flags_no_action(ri, st); - if(dictionary_stats_entries(ri->rrdmetrics) > 0) { - - if(unlikely(!rrd_flag_is_collected(ri))) - rrd_flag_set_collected(ri); + if(unlikely(ri->internal.collected_metrics && !rrd_flag_is_collected(ri))) + rrd_flag_set_collected(ri); - if(unlikely(ri->flags & RRD_FLAG_DONT_PROCESS)) - ri->flags &= ~RRD_FLAG_DONT_PROCESS; + // we use this variable to detect BEGIN/END without SET + ri->internal.collected_metrics = 0; - rrdinstance_trigger_updates(ri, false, true, __FUNCTION__); - } + rrdinstance_trigger_updates(ri, __FUNCTION__ ); } // ---------------------------------------------------------------------------- @@ -1267,131 +1122,13 @@ static void rrdcontext_freez(RRDCONTEXT *rc) { string_freez(rc->family); } -static uint64_t rrdcontext_get_next_version(RRDCONTEXT *rc) { - time_t now = now_realtime_sec(); - uint64_t version = MAX(rc->version, rc->hub.version); - version = MAX((uint64_t)now, version); - version++; - return version; -} +static void rrdcontext_insert_callback(const char *id, void *value, void *data) { + (void)id; + RRDHOST *host = (RRDHOST *)data; + RRDCONTEXT *rc = (RRDCONTEXT *)value; -static void rrdcontext_message_send_unsafe(RRDCONTEXT *rc, bool snapshot __maybe_unused, void *bundle __maybe_unused) { - - // save it, so that we know the last version we sent to hub - rc->version = rc->hub.version = rrdcontext_get_next_version(rc); - rc->hub.id = string2str(rc->id); - rc->hub.title = string2str(rc->title); - rc->hub.units = string2str(rc->units); - rc->hub.family = string2str(rc->family); - rc->hub.chart_type = rrdset_type_name(rc->chart_type); - rc->hub.priority = rc->priority; - rc->hub.first_time_t = rc->first_time_t; - rc->hub.last_time_t = rrd_flag_is_collected(rc) ? 0 : rc->last_time_t; - rc->hub.deleted = (rc->flags & RRD_FLAG_DELETED) ? true : false; - -#ifdef ENABLE_ACLK - struct context_updated message = { - .id = rc->hub.id, - .version = rc->hub.version, - .title = rc->hub.title, - .units = rc->hub.units, - .family = rc->hub.family, - .chart_type = rc->hub.chart_type, - .priority = rc->hub.priority, - .first_entry = rc->hub.first_time_t, - .last_entry = rc->hub.last_time_t, - .deleted = rc->hub.deleted, - }; - - if(likely(!(rc->flags & RRD_FLAG_HIDDEN))) { - if (snapshot) { - if (!rc->hub.deleted) - contexts_snapshot_add_ctx_update(bundle, &message); - } - else - contexts_updated_add_ctx_update(bundle, &message); - } -#endif - - // store it to SQL - - if(rc->flags & RRD_FLAG_DELETED) { - rrdcontext_delete_from_sql_unsafe(rc); - } - else { - if (ctx_store_context(&rc->rrdhost->host_uuid, &rc->hub) != 0) - error("RRDCONTEXT: failed to save context '%s' version %"PRIu64" to SQL.", rc->hub.id, rc->hub.version); - } -} - -static bool check_if_cloud_version_changed_unsafe(RRDCONTEXT *rc, bool sending __maybe_unused) { - bool id_changed = false, - title_changed = false, - units_changed = false, - family_changed = false, - chart_type_changed = false, - priority_changed = false, - first_time_changed = false, - last_time_changed = false, - deleted_changed = false; - - if(unlikely(string2str(rc->id) != rc->hub.id)) - id_changed = true; - - if(unlikely(string2str(rc->title) != rc->hub.title)) - title_changed = true; - - if(unlikely(string2str(rc->units) != rc->hub.units)) - units_changed = true; - - if(unlikely(string2str(rc->family) != rc->hub.family)) - family_changed = true; - - if(unlikely(rrdset_type_name(rc->chart_type) != rc->hub.chart_type)) - chart_type_changed = true; - - if(unlikely(rc->priority != rc->hub.priority)) - priority_changed = true; - - if(unlikely((uint64_t)rc->first_time_t != rc->hub.first_time_t)) - first_time_changed = true; - - if(unlikely((uint64_t)(rrd_flag_is_collected(rc) ? 0 : rc->last_time_t) != rc->hub.last_time_t)) - last_time_changed = true; - - if(unlikely(((rc->flags & RRD_FLAG_DELETED) ? true : false) != rc->hub.deleted)) - deleted_changed = true; - - if(unlikely(id_changed || title_changed || units_changed || family_changed || chart_type_changed || priority_changed || first_time_changed || last_time_changed || deleted_changed)) { - - internal_error(true, "RRDCONTEXT: %s NEW VERSION '%s'%s, version %"PRIu64", title '%s'%s, units '%s'%s, family '%s'%s, chart type '%s'%s, priority %u%s, first_time_t %ld%s, last_time_t %ld%s, deleted '%s'%s, (queued for %llu ms, expected %llu ms)", - sending?"SENDING":"QUEUE", - string2str(rc->id), id_changed ? " (CHANGED)" : "", - rc->version, - string2str(rc->title), title_changed ? " (CHANGED)" : "", - string2str(rc->units), units_changed ? " (CHANGED)" : "", - string2str(rc->family), family_changed ? " (CHANGED)" : "", - rrdset_type_name(rc->chart_type), chart_type_changed ? " (CHANGED)" : "", - rc->priority, priority_changed ? " (CHANGED)" : "", - rc->first_time_t, first_time_changed ? " (CHANGED)" : "", - rrd_flag_is_collected(rc) ? 0 : rc->last_time_t, last_time_changed ? " (CHANGED)" : "", - (rc->flags & RRD_FLAG_DELETED) ? "true" : "false", deleted_changed ? " (CHANGED)" : "", - sending ? (now_realtime_usec() - rc->queue.queued_ut) / USEC_PER_MS : 0, - sending ? (rc->queue.scheduled_dispatch_ut - rc->queue.queued_ut) / USEC_PER_SEC : 0 - ); - return true; - } - - return false; -} - -static void rrdcontext_insert_callback(const char *id, void *value, void *data) { - (void)id; - RRDHOST *host = (RRDHOST *)data; - RRDCONTEXT *rc = (RRDCONTEXT *)value; - - rc->rrdhost = host; - rc->flags = rc->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS; + rc->rrdhost = host; + rc->flags = rc->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS; // no need for atomics at constructor if(rc->hub.version) { // we are loading data from the SQL database @@ -1428,7 +1165,7 @@ static void rrdcontext_insert_callback(const char *id, void *value, void *data) rc->last_time_t = rc->hub.last_time_t; if(rc->hub.deleted || !rc->hub.first_time_t) - rrd_flag_set_deleted(rc, 0); + rrd_flag_set_deleted(rc, RRD_FLAG_NONE); else { if (rc->last_time_t == 0) rrd_flag_set_collected(rc); @@ -1436,14 +1173,14 @@ static void rrdcontext_insert_callback(const char *id, void *value, void *data) rrd_flag_set_archived(rc); } - rc->flags |= RRD_FLAG_UPDATE_REASON_LOAD_SQL; + rc->flags |= RRD_FLAG_UPDATE_REASON_LOAD_SQL; // no need for atomics at constructor } else { // we are adding this context now for the first time rc->version = now_realtime_sec(); } - rrdinstances_create(rc); + rrdinstances_create_in_rrdcontext(rc); netdata_mutex_init(&rc->mutex); // signal the react callback to do the job @@ -1457,7 +1194,7 @@ static void rrdcontext_delete_callback(const char *id, void *value, void *data) RRDCONTEXT *rc = (RRDCONTEXT *)value; - rrdinstances_destroy(rc); + rrdinstances_destroy_from_rrdcontext(rc); netdata_mutex_destroy(&rc->mutex); rrdcontext_freez(rc); } @@ -1515,13 +1252,13 @@ static void rrdcontext_conflict_callback(const char *id, void *oldv, void *newv, rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_PRIORITY); } - rc->flags |= (rc_new->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS); + rrd_flag_set(rc, rc_new->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS); // no need for atomics on rc_new if(rrd_flag_is_collected(rc) && rrd_flag_is_archived(rc)) rrd_flag_set_collected(rc); - if(rc->flags & RRD_FLAG_UPDATED) - rc->flags |= RRD_FLAG_UPDATE_REASON_UPDATED_OBJECT; + if(rrd_flag_is_updated(rc)) + rrd_flag_set(rc, RRD_FLAG_UPDATE_REASON_UPDATED_OBJECT); rrdcontext_unlock(rc); @@ -1534,7 +1271,12 @@ static void rrdcontext_conflict_callback(const char *id, void *oldv, void *newv, static void rrdcontext_react_callback(const char *id __maybe_unused, void *value, void *data __maybe_unused) { RRDCONTEXT *rc = (RRDCONTEXT *)value; - rrdcontext_trigger_updates(rc, false, __FUNCTION__); + rrdcontext_trigger_updates(rc, __FUNCTION__ ); +} + +static void rrdcontext_trigger_updates(RRDCONTEXT *rc, const char *function) { + if(rrd_flag_is_updated(rc) || !rrd_flag_check(rc, RRD_FLAG_LIVE_RETENTION)) + rrdcontext_queue_for_post_processing(rc, function, rc->flags); } void rrdhost_create_rrdcontexts(RRDHOST *host) { @@ -1550,187 +1292,45 @@ void rrdhost_create_rrdcontexts(RRDHOST *host) { dictionary_register_conflict_callback((DICTIONARY *)host->rrdctx, rrdcontext_conflict_callback, (void *)host); dictionary_register_react_callback((DICTIONARY *)host->rrdctx, rrdcontext_react_callback, (void *)host); - host->rrdctx_queue = (RRDCONTEXTS *)dictionary_create(DICTIONARY_FLAG_DONT_OVERWRITE_VALUE | DICTIONARY_FLAG_VALUE_LINK_DONT_CLONE); + host->rrdctx_hub_queue = (RRDCONTEXTS *)dictionary_create(DICTIONARY_FLAG_DONT_OVERWRITE_VALUE | DICTIONARY_FLAG_VALUE_LINK_DONT_CLONE); + host->rrdctx_post_processing_queue = (RRDCONTEXTS *)dictionary_create(DICTIONARY_FLAG_DONT_OVERWRITE_VALUE | DICTIONARY_FLAG_VALUE_LINK_DONT_CLONE); } void rrdhost_destroy_rrdcontexts(RRDHOST *host) { if(unlikely(!host)) return; if(unlikely(!host->rrdctx)) return; - if(host->rrdctx_queue) { - dictionary_destroy((DICTIONARY *)host->rrdctx_queue); - host->rrdctx_queue = NULL; - } - - dictionary_destroy((DICTIONARY *)host->rrdctx); - host->rrdctx = NULL; -} - -static inline bool rrdcontext_should_be_deleted(RRDCONTEXT *rc) { - if(likely(!(rc->flags & RRD_FLAG_DELETED))) - return false; - - if(likely(!(rc->flags & RRD_FLAG_LIVE_RETENTION))) - return false; - - if(unlikely(rc->flags & RRD_FLAGS_PREVENTING_DELETIONS)) - return false; - - if(unlikely(dictionary_stats_referenced_items(rc->rrdinstances) != 0)) - return false; - - if(unlikely(dictionary_stats_entries(rc->rrdinstances) != 0)) - return false; - - if(unlikely(rc->first_time_t || rc->last_time_t)) - return false; - - return true; -} - -static void rrdcontext_trigger_updates(RRDCONTEXT *rc, bool force, const char *function __maybe_unused) { - if(unlikely(rc->flags & RRD_FLAG_DONT_PROCESS)) return; - if(unlikely(!force && !(rc->flags & RRD_FLAG_UPDATED))) return; - - // logs and stats - log_transition(NULL, NULL, rc, function); - rrdcontext_triggered_update_on_rrdcontext(); - - rrdcontext_lock(rc); - - size_t min_priority = LONG_MAX; - time_t min_first_time_t = LONG_MAX, max_last_time_t = 0; - size_t instances_active = 0, instances_deleted = 0; - bool live_retention = true, currently_collected = false, hidden = true; - if(dictionary_stats_entries(rc->rrdinstances) > 0) { - RRDINSTANCE *ri; - dfe_start_read(rc->rrdinstances, ri) { - if(likely(!(ri->flags & RRD_FLAG_HIDDEN))) - hidden = false; - - if(!(ri->flags & RRD_FLAG_LIVE_RETENTION)) - live_retention = false; - - if (unlikely(rrdinstance_should_be_deleted(ri))) { - instances_deleted++; - rrd_flag_unset_updated(ri); - continue; - } - - if(ri->flags & RRD_FLAG_COLLECTED && ri->first_time_t) - currently_collected = true; - - internal_error(rc->units != ri->units, - "RRDCONTEXT: '%s' rrdinstance '%s' has different units, context '%s', instance '%s'", - string2str(rc->id), string2str(ri->id), - string2str(rc->units), string2str(ri->units)); - - instances_active++; - - if (ri->priority >= RRDCONTEXT_MINIMUM_ALLOWED_PRIORITY && ri->priority < min_priority) - min_priority = ri->priority; - - if (ri->first_time_t && ri->first_time_t < min_first_time_t) - min_first_time_t = ri->first_time_t; - - if (ri->last_time_t && ri->last_time_t > max_last_time_t) - max_last_time_t = ri->last_time_t; - - rrd_flag_unset_updated(ri); - } - dfe_done(ri); - } - - if(hidden && !(rc->flags & RRD_FLAG_HIDDEN)) - rc->flags |= RRD_FLAG_HIDDEN; - else if(!hidden && (rc->flags & RRD_FLAG_HIDDEN)) - rc->flags &= ~RRD_FLAG_HIDDEN; - - if(live_retention && !(rc->flags & RRD_FLAG_LIVE_RETENTION)) - rc->flags |= RRD_FLAG_LIVE_RETENTION; - else if(!live_retention && (rc->flags & RRD_FLAG_LIVE_RETENTION)) - rc->flags &= ~RRD_FLAG_LIVE_RETENTION; - - if(unlikely(!instances_active)) { - // we had some instances, but they are gone now... - - if(rc->first_time_t) { - rc->first_time_t = 0; - rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T); - } - - if(rc->last_time_t) { - rc->last_time_t = 0; - rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T); - } - - rrd_flag_set_deleted(rc, RRD_FLAG_UPDATE_REASON_ZERO_RETENTION); - } - else { - // we have some active instances... - - if (unlikely(min_first_time_t == LONG_MAX)) - min_first_time_t = 0; - - if (unlikely(min_first_time_t == 0 && max_last_time_t == 0)) { - if(rc->first_time_t) { - rc->first_time_t = 0; - rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T); - } - - if(rc->last_time_t) { - rc->last_time_t = 0; - rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T); - } - - rrd_flag_set_deleted(rc, RRD_FLAG_UPDATE_REASON_ZERO_RETENTION); - } - else { - rc->flags &= ~RRD_FLAG_UPDATE_REASON_ZERO_RETENTION; - - if (unlikely(rc->first_time_t != min_first_time_t)) { - rc->first_time_t = min_first_time_t; - rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T); - } - - if (rc->last_time_t != max_last_time_t) { - rc->last_time_t = max_last_time_t; - rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T); - } + DICTIONARY *old; - if(likely(currently_collected)) - rrd_flag_set_collected(rc); - else - rrd_flag_set_archived(rc); - } + if(host->rrdctx_hub_queue) { + old = (DICTIONARY *)host->rrdctx_hub_queue; + host->rrdctx_hub_queue = NULL; - if (min_priority != LONG_MAX && rc->priority != min_priority) { - rc->priority = min_priority; - rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_PRIORITY); + RRDCONTEXT *rc; + dfe_start_write(old, rc) { + dictionary_del_having_write_lock(old, string2str(rc->id)); + rrdset_flag_clear(rc, RRD_FLAG_QUEUED_FOR_HUB); } + dfe_done(rc); + dictionary_destroy(old); } - if(unlikely(rc->flags & RRD_FLAG_UPDATED)) { - if(check_if_cloud_version_changed_unsafe(rc, false)) { - rc->version = rrdcontext_get_next_version(rc); - - if(rc->flags & RRD_FLAG_QUEUED) { - rc->queue.queued_ut = now_realtime_usec(); - rc->queue.queued_flags |= rc->flags; - } - else { - rc->queue.queued_ut = now_realtime_usec(); - rc->queue.queued_flags = rc->flags; + if(host->rrdctx_post_processing_queue) { + old = (DICTIONARY *)host->rrdctx_post_processing_queue; + host->rrdctx_post_processing_queue = NULL; - rc->flags |= RRD_FLAG_QUEUED; - dictionary_set((DICTIONARY *)rc->rrdhost->rrdctx_queue, string2str(rc->id), rc, sizeof(*rc)); - } + RRDCONTEXT *rc; + dfe_start_write(old, rc) { + dictionary_del_having_write_lock(old, string2str(rc->id)); + rrdset_flag_clear(rc, RRD_FLAG_QUEUED_FOR_POST_PROCESSING); } - - rrd_flag_unset_updated(rc); + dfe_done(rc); + dictionary_destroy(old); } - rrdcontext_unlock(rc); + old = (DICTIONARY *)host->rrdctx; + host->rrdctx = NULL; + dictionary_destroy(old); } // ------------