diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2022-09-06 19:02:39 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-09-06 19:02:39 +0300 |
commit | 58c79fd329df7d2187e4aee56fb4a58a9c02c3ae (patch) | |
tree | b48bde3c183e1fa7b164e35a12e5ad87c73e001e /database | |
parent | 64a6920038d5f185710154790ff49e4913caac83 (diff) |
Faster rrdcontext (#13629)
* moved rrdcontexts processing to worker thread
* added loggings
* check for aclk deeper in the code
* removed unessesary logs
* code re-organization; cleanup; more comments; better error handling; rrdcontext locks optimization; more clarity
* updated 2 comments
* make instances walkthrough reentrant; move context lock to the place is really needed
* created macro for reentrant dictionary walkthrough
* incremental updates on instances and metrics
* renamed family of rrdcontext workers
* prevent crash in case RRDINSTANCE or RRDMETRIC is freed during shutdown
* prevent crash during rrddim save, on out of memory fatal()
* always post-process contexts
* added tracing for tracking the caller that trigger updates
* more details on tracing info
* fix for charts that are collected without metrics
Diffstat (limited to 'database')
-rw-r--r-- | database/rrd.h | 3 | ||||
-rw-r--r-- | database/rrdcontext.c | 1948 | ||||
-rw-r--r-- | database/rrddim.c | 8 | ||||
-rw-r--r-- | database/rrdvar.c | 2 |
4 files changed, 1060 insertions, 901 deletions
diff --git a/database/rrd.h b/database/rrd.h index 36c4e5d58f..4c3b13dfb1 100644 --- a/database/rrd.h +++ b/database/rrd.h @@ -916,7 +916,8 @@ struct rrdhost { STORAGE_INSTANCE *storage_instance[RRD_STORAGE_TIERS]; // the database instances of the storage tiers - RRDCONTEXTS *rrdctx_queue; + RRDCONTEXTS *rrdctx_hub_queue; + RRDCONTEXTS *rrdctx_post_processing_queue; RRDCONTEXTS *rrdctx; uuid_t host_uuid; // Global GUID for this host diff --git a/database/rrdcontext.c b/database/rrdcontext.c index b3af8b44cc..dcf8b94e22 100644 --- a/database/rrdcontext.c +++ b/database/rrdcontext.c @@ -10,10 +10,23 @@ int rrdcontext_enabled = CONFIG_BOOLEAN_YES; #define MESSAGES_PER_BUNDLE_TO_SEND_TO_HUB_PER_HOST 5000 #define FULL_RETENTION_SCAN_DELAY_AFTER_DB_ROTATION_SECS 120 -#define RRDCONTEXT_WORKER_THREAD_HEARTBEAT_SECS 1 +#define RRDCONTEXT_WORKER_THREAD_HEARTBEAT_USEC (1000 * USEC_PER_MS) #define RRDCONTEXT_MINIMUM_ALLOWED_PRIORITY 10 -// #define LOG_TRANSITIONS 1 +#define WORKER_JOB_HOSTS 1 +#define WORKER_JOB_CHECK 2 +#define WORKER_JOB_SEND 3 +#define WORKER_JOB_DEQUEUE 4 +#define WORKER_JOB_RETENTION 5 +#define WORKER_JOB_QUEUED 6 +#define WORKER_JOB_CLEANUP 7 +#define WORKER_JOB_CLEANUP_DELETE 8 +#define WORKER_JOB_PP_METRIC 9 // post-processing metrics +#define WORKER_JOB_PP_INSTANCE 10 // post-processing instances +#define WORKER_JOB_PP_CONTEXT 11 // post-processing contexts +#define WORKER_JOB_HUB_QUEUE_SIZE 12 +#define WORKER_JOB_PP_QUEUE_SIZE 13 + typedef enum { RRD_FLAG_NONE = 0, @@ -23,10 +36,11 @@ typedef enum { RRD_FLAG_ARCHIVED = (1 << 3), // this object is not currently being collected RRD_FLAG_OWN_LABELS = (1 << 4), // this instance has its own labels - not linked to an RRDSET RRD_FLAG_LIVE_RETENTION = (1 << 5), // we have got live retention from the database - RRD_FLAG_QUEUED = (1 << 6), // this context is currently queued to be dispatched to hub - RRD_FLAG_DONT_PROCESS = (1 << 7), // don't process updates for this object + RRD_FLAG_QUEUED_FOR_HUB = (1 << 6), // this context is currently queued to be dispatched to hub + RRD_FLAG_QUEUED_FOR_POST_PROCESSING = (1 << 7), // this context is currently queued to be post-processed RRD_FLAG_HIDDEN = (1 << 8), // don't expose this to the hub or the API + RRD_FLAG_UPDATE_REASON_TRIGGERED = (1 << 9), // the update was triggered by the child object RRD_FLAG_UPDATE_REASON_LOAD_SQL = (1 << 10), // this object has just been loaded from SQL RRD_FLAG_UPDATE_REASON_NEW_OBJECT = (1 << 11), // this object has just been created RRD_FLAG_UPDATE_REASON_UPDATED_OBJECT = (1 << 12), // we received an update on this object @@ -54,7 +68,8 @@ typedef enum { } RRD_FLAGS; #define RRD_FLAG_ALL_UPDATE_REASONS ( \ - RRD_FLAG_UPDATE_REASON_LOAD_SQL \ + RRD_FLAG_UPDATE_REASON_TRIGGERED \ + |RRD_FLAG_UPDATE_REASON_LOAD_SQL \ |RRD_FLAG_UPDATE_REASON_NEW_OBJECT \ |RRD_FLAG_UPDATE_REASON_UPDATED_OBJECT \ |RRD_FLAG_UPDATE_REASON_CHANGED_LINKING \ @@ -79,51 +94,110 @@ typedef enum { #define RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS ( \ RRD_FLAG_ARCHIVED \ - |RRD_FLAG_DONT_PROCESS \ |RRD_FLAG_HIDDEN \ |RRD_FLAG_ALL_UPDATE_REASONS \ ) +#define RRD_FLAGS_REQUIRED_FOR_DELETIONS ( \ + RRD_FLAG_DELETED \ + |RRD_FLAG_LIVE_RETENTION \ +) + #define RRD_FLAGS_PREVENTING_DELETIONS ( \ - RRD_FLAG_QUEUED \ + RRD_FLAG_QUEUED_FOR_HUB \ |RRD_FLAG_COLLECTED \ + \ + /* RRD_FLAG_QUEUED_FOR_POST_PROCESSING */ \ + /* should not be here or nothing will be deleted */ \ ) -#define rrd_flag_set_updated(obj, reason) (obj)->flags |= (RRD_FLAG_UPDATED | (reason)) -#define rrd_flag_unset_updated(obj) (obj)->flags &= ~(RRD_FLAG_UPDATED | RRD_FLAG_ALL_UPDATE_REASONS) - -#define rrd_flag_set_collected(obj) do { \ - if(likely( !((obj)->flags & RRD_FLAG_COLLECTED))) \ - (obj)->flags |= (RRD_FLAG_COLLECTED | RRD_FLAG_UPDATE_REASON_STARTED_BEING_COLLECTED | RRD_FLAG_UPDATED); \ - if(likely( ((obj)->flags & (RRD_FLAG_ARCHIVED | RRD_FLAG_UPDATE_REASON_STOPPED_BEING_COLLECTED)))) \ - (obj)->flags &= ~(RRD_FLAG_ARCHIVED | RRD_FLAG_UPDATE_REASON_STOPPED_BEING_COLLECTED); \ - if(unlikely(((obj)->flags & (RRD_FLAG_DELETED | RRD_FLAG_UPDATE_REASON_ZERO_RETENTION)))) \ - (obj)->flags &= ~(RRD_FLAG_DELETED | RRD_FLAG_UPDATE_REASON_ZERO_RETENTION); \ - if(unlikely(((obj)->flags & RRD_FLAG_DONT_PROCESS))) \ - (obj)->flags &= ~RRD_FLAG_DONT_PROCESS; \ -} while(0) - -#define rrd_flag_set_archived(obj) do { \ - if(likely( !((obj)->flags & RRD_FLAG_ARCHIVED))) \ - (obj)->flags |= (RRD_FLAG_ARCHIVED | RRD_FLAG_UPDATE_REASON_STOPPED_BEING_COLLECTED | RRD_FLAG_UPDATED); \ - if(likely( ((obj)->flags & (RRD_FLAG_COLLECTED | RRD_FLAG_UPDATE_REASON_STARTED_BEING_COLLECTED)))) \ - (obj)->flags &= ~(RRD_FLAG_COLLECTED | RRD_FLAG_UPDATE_REASON_STARTED_BEING_COLLECTED); \ - if(unlikely(((obj)->flags & (RRD_FLAG_DELETED | RRD_FLAG_UPDATE_REASON_ZERO_RETENTION)))) \ - (obj)->flags &= ~(RRD_FLAG_DELETED | RRD_FLAG_UPDATE_REASON_ZERO_RETENTION); \ -} while(0) - -#define rrd_flag_set_deleted(obj, reason) do { \ - if(likely( !((obj)->flags & RRD_FLAG_DELETED))) \ - (obj)->flags |= (RRD_FLAG_DELETED | RRD_FLAG_UPDATE_REASON_ZERO_RETENTION | RRD_FLAG_UPDATED | (reason)); \ - if(unlikely(((obj)->flags & RRD_FLAG_ARCHIVED))) \ - (obj)->flags &= ~RRD_FLAG_ARCHIVED; \ - if(likely( ((obj)->flags & RRD_FLAG_COLLECTED))) \ - (obj)->flags &= ~RRD_FLAG_COLLECTED; \ -} while(0) - - -#define rrd_flag_is_collected(obj) ((obj)->flags & RRD_FLAG_COLLECTED) -#define rrd_flag_is_archived(obj) ((obj)->flags & RRD_FLAG_ARCHIVED) +// get all the flags of an object +#define rrd_flags_get(obj) __atomic_load_n(&((obj)->flags), __ATOMIC_SEQ_CST) + +// check if ANY of the given flags (bits) is set +#define rrd_flag_check(obj, flag) (rrd_flags_get(obj) & (flag)) + +// check if ALL of the given flags (bits) are set +#define rrd_flag_check_all(obj, flag) (rrd_flag_check(obj, flag) == (flag)) + +// set one or more flags (bits) +#define rrd_flag_set(obj, flag) __atomic_or_fetch(&((obj)->flags), flag, __ATOMIC_SEQ_CST) + +// clear one or more flags (bits) +#define rrd_flag_clear(obj, flag) __atomic_and_fetch(&((obj)->flags), ~(flag), __ATOMIC_SEQ_CST) + +// replace the flags of an object, with the supplied ones +#define rrd_flags_replace(obj, all_flags) __atomic_store_n(&((obj)->flags), all_flags, __ATOMIC_SEQ_CST) + +static inline void +rrd_flag_add_remove_atomic(RRD_FLAGS *flags, RRD_FLAGS check, RRD_FLAGS conditionally_add, RRD_FLAGS always_remove) { + RRD_FLAGS expected, desired; + do { + expected = *flags; + + desired = expected; + desired &= ~(always_remove); + + if(!(expected & check)) + desired |= (check | conditionally_add); + + } while(!__atomic_compare_exchange_n(flags, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)); +} + +#define rrd_flag_set_collected(obj) \ + rrd_flag_add_remove_atomic(&((obj)->flags) \ + /* check this flag */ \ + , RRD_FLAG_COLLECTED \ + \ + /* add these flags together with the above, if the above is not already set */ \ + , RRD_FLAG_UPDATE_REASON_STARTED_BEING_COLLECTED | RRD_FLAG_UPDATED \ + \ + /* always remove these flags */ \ + , RRD_FLAG_ARCHIVED \ + | RRD_FLAG_DELETED \ + | RRD_FLAG_UPDATE_REASON_STOPPED_BEING_COLLECTED \ + | RRD_FLAG_UPDATE_REASON_ZERO_RETENTION \ + ) + +#define rrd_flag_set_archived(obj) \ + rrd_flag_add_remove_atomic(&((obj)->flags) \ + /* check this flag */ \ + , RRD_FLAG_ARCHIVED \ + \ + /* add these flags together with the above, if the above is not already set */ \ + , RRD_FLAG_UPDATE_REASON_STOPPED_BEING_COLLECTED | RRD_FLAG_UPDATED \ + \ + /* always remove these flags */ \ + , RRD_FLAG_COLLECTED \ + | RRD_FLAG_DELETED \ + | RRD_FLAG_UPDATE_REASON_STARTED_BEING_COLLECTED \ + | RRD_FLAG_UPDATE_REASON_ZERO_RETENTION \ + ) + +#define rrd_flag_set_deleted(obj, reason) \ + rrd_flag_add_remove_atomic(&((obj)->flags) \ + /* check this flag */ \ + , RRD_FLAG_DELETED \ + \ + /* add these flags together with the above, if the above is not already set */ \ + , RRD_FLAG_UPDATE_REASON_ZERO_RETENTION | RRD_FLAG_UPDATED | (reason) \ + \ + /* always remove these flags */ \ + , RRD_FLAG_ARCHIVED \ + | RRD_FLAG_DELETED \ + ) + +#define rrd_flag_is_collected(obj) rrd_flag_check(obj, RRD_FLAG_COLLECTED) +#define rrd_flag_is_archived(obj) rrd_flag_check(obj, RRD_FLAG_ARCHIVED) +#define rrd_flag_is_deleted(obj) rrd_flag_check(obj, RRD_FLAG_DELETED) +#define rrd_flag_is_updated(obj) rrd_flag_check(obj, RRD_FLAG_UPDATED) + +// mark an object as updated, providing reasons (additional bits) +#define rrd_flag_set_updated(obj, reason) rrd_flag_set(obj, RRD_FLAG_UPDATED | (reason)) + +// clear an object as being updated, clearing also all the reasons +#define rrd_flag_unset_updated(obj) rrd_flag_clear(obj, RRD_FLAG_UPDATED | RRD_FLAG_ALL_UPDATE_REASONS) + static struct rrdcontext_reason { RRD_FLAGS flag; @@ -131,6 +205,7 @@ static struct rrdcontext_reason { usec_t delay_ut; } rrdcontext_reasons[] = { // context related + { RRD_FLAG_UPDATE_REASON_TRIGGERED, "triggered transition", 60 * USEC_PER_SEC }, { RRD_FLAG_UPDATE_REASON_NEW_OBJECT, "object created", 60 * USEC_PER_SEC }, { RRD_FLAG_UPDATE_REASON_UPDATED_OBJECT, "object updated", 60 * USEC_PER_SEC }, { RRD_FLAG_UPDATE_REASON_LOAD_SQL, "loaded from sql", 60 * USEC_PER_SEC }, @@ -173,8 +248,6 @@ typedef struct rrdmetric { RRD_FLAGS flags; struct rrdinstance *ri; - - usec_t created_ut; // the time this object was created } RRDMETRIC; typedef struct rrdinstance { @@ -199,6 +272,12 @@ typedef struct rrdinstance { struct rrdcontext *rc; DICTIONARY *rrdmetrics; + + struct { + uint32_t collected_metrics; // a temporary variable to detect BEGIN/END without SET + // don't use it for other purposes + // it goes up and then resets to zero, on every iteration + } internal; } RRDINSTANCE; typedef struct rrdcontext { @@ -287,31 +366,38 @@ static inline void rrdcontext_release(RRDCONTEXT_ACQUIRED *rca) { dictionary_acquired_item_release((DICTIONARY *)rc->rrdhost->rrdctx, (DICTIONARY_ITEM *)rca); } -static void rrdcontext_recalculate_context_retention(RRDCONTEXT *rc, RRD_FLAGS reason, int job_id); -static void rrdcontext_recalculate_host_retention(RRDHOST *host, RRD_FLAGS reason, int job_id); +static void rrdcontext_recalculate_context_retention(RRDCONTEXT *rc, RRD_FLAGS reason, bool worker_jobs); +static void rrdcontext_recalculate_host_retention(RRDHOST *host, RRD_FLAGS reason, bool worker_jobs); #define rrdcontext_version_hash(host) rrdcontext_version_hash_with_callback(host, NULL, false, NULL) static uint64_t rrdcontext_version_hash_with_callback(RRDHOST *host, void (*callback)(RRDCONTEXT *, bool, void *), bool snapshot, void *bundle); -static void rrdcontext_garbage_collect_single_host(RRDHOST *host, bool worker); -static void rrdcontext_garbage_collect(void); +static void rrdcontext_garbage_collect_single_host(RRDHOST *host, bool worker_jobs); +static void rrdcontext_garbage_collect_for_all_hosts(void); void rrdcontext_delete_from_sql_unsafe(RRDCONTEXT *rc); #define rrdcontext_lock(rc) netdata_mutex_lock(&((rc)->mutex)) #define rrdcontext_unlock(rc) netdata_mutex_unlock(&((rc)->mutex)) // ---------------------------------------------------------------------------- -// Updates triggers +// Forward definitions + +static uint64_t rrdcontext_get_next_version(RRDCONTEXT *rc); +static bool check_if_cloud_version_changed_unsafe(RRDCONTEXT *rc, bool sending __maybe_unused); +static void rrdcontext_message_send_unsafe(RRDCONTEXT *rc, bool snapshot __maybe_unused, void *bundle __maybe_unused); -static void rrdmetric_trigger_updates(RRDMETRIC *rm, bool force, bool escalate, const char *function); -static void rrdinstance_trigger_updates(RRDINSTANCE *ri, bool force, bool escalate, const char *function); -static void rrdcontext_trigger_updates(RRDCONTEXT *rc, bool force, const char *function); +static void rrdcontext_queue_for_post_processing(RRDCONTEXT *rc, const char *function, RRD_FLAGS flags); +static void rrdcontext_post_process_updates(RRDCONTEXT *rc, bool force, RRD_FLAGS reason, bool worker_jobs); + +static void rrdmetric_trigger_updates(RRDMETRIC *rm, const char *function); +static void rrdinstance_trigger_updates(RRDINSTANCE *ri, const char *function); +static void rrdcontext_trigger_updates(RRDCONTEXT *rc, const char *function); // ---------------------------------------------------------------------------- // visualizing flags static void rrd_flags_to_buffer(RRD_FLAGS flags, BUFFER *wb) { - if(flags & RRD_FLAG_QUEUED) + if(flags & RRD_FLAG_QUEUED_FOR_HUB) buffer_strcat(wb, "QUEUED "); if(flags & RRD_FLAG_DELETED) @@ -332,11 +418,11 @@ static void rrd_flags_to_buffer(RRD_FLAGS flags, BUFFER *wb) { if(flags & RRD_FLAG_LIVE_RETENTION) buffer_strcat(wb, "LIVE_RETENTION "); - if(flags & RRD_FLAG_DONT_PROCESS) - buffer_strcat(wb, "DONT_PROCESS "); - if(flags & RRD_FLAG_HIDDEN) buffer_strcat(wb, "HIDDEN "); + + if(flags & RRD_FLAG_QUEUED_FOR_POST_PROCESSING) + buffer_strcat(wb, "PENDING_UPDATES "); } static void rrd_reasons_to_buffer(RRD_FLAGS flags, BUFFER *wb) { @@ -351,54 +437,10 @@ static void rrd_reasons_to_buffer(RRD_FLAGS flags, BUFFER *wb) { } // ---------------------------------------------------------------------------- -// logging of all data collected - -#ifdef LOG_TRANSITIONS -static void log_transition(RRDMETRIC *rm, RRDINSTANCE *ri, RRDCONTEXT *rc, const char *function) { - BUFFER *wb = buffer_create(1000); - const char *triggered_on = "triggered on "; - - buffer_sprintf(wb, "RRD TRANSITION: %s() ", function); - - if(rm) { - buffer_sprintf(wb, "%smetric '%s' of ", triggered_on, string2str(rm->id)); - triggered_on = ""; - } - - if(ri) { - buffer_sprintf(wb, "%sinstance '%s' of ", triggered_on, string2str(ri->id)); - triggered_on = ""; - } - - buffer_sprintf(wb, "%scontext '%s' ", triggered_on, string2str(rc->id)); - - RRD_FLAGS flags = rc->flags; - const char *we_are = "context"; - if(ri) { - flags = ri->flags; - we_are = "instance"; - } - if(rm) { - flags = rm->flags; - we_are = "metric"; - } - - buffer_sprintf(wb, "%s flags: ", we_are); - rrd_flags_to_buffer(flags, wb); - - buffer_strcat(wb, ", having reasons: "); - rrd_reasons_to_buffer(flags, wb); - - internal_error(true, "%s", buffer_tostring(wb)); - buffer_free(wb); -} -#else -#define log_transition(rm, ri, rc, function) debug_dummy() -#endif - -// ---------------------------------------------------------------------------- // RRDMETRIC +// free the contents of RRDMETRIC. +// RRDMETRIC itself is managed by DICTIONARY - no need to free it here. static void rrdmetric_free(RRDMETRIC *rm) { string_freez(rm->id); string_freez(rm->name); @@ -408,60 +450,8 @@ static void rrdmetric_free(RRDMETRIC *rm) { rm->ri = NULL; } -static void rrdmetric_update_retention(RRDMETRIC *rm) { - time_t min_first_time_t = LONG_MAX, max_last_time_t = 0; - - if(rm->rrddim) { - min_first_time_t = rrddim_first_entry_t(rm->rrddim); - max_last_time_t = rrddim_last_entry_t(rm->rrddim); - } -#ifdef ENABLE_DBENGINE - else { - RRDHOST *rrdhost = rm->ri->rc->rrdhost; - for (int tier = 0; tier < storage_tiers; tier++) { - if(!rrdhost->storage_instance[tier]) continue; - - time_t first_time_t, last_time_t; - if (rrdeng_metric_retention_by_uuid(rrdhost->storage_instance[tier], &rm->uuid, &first_time_t, &last_time_t) == 0) { - if (first_time_t < min_first_time_t) - min_first_time_t = first_time_t; - - if (last_time_t > max_last_time_t) - max_last_time_t = last_time_t; - } - } - } -#endif - - if(min_first_time_t == LONG_MAX) - min_first_time_t = 0; - - if(min_first_time_t > max_last_time_t) { - internal_error(true, "RRDMETRIC: retention of '%s' is flipped", string2str(rm->id)); - time_t tmp = min_first_time_t; - min_first_time_t = max_last_time_t; - max_last_time_t = tmp; - } - - // check if retention changed - - if (min_first_time_t != rm->first_time_t) { - rm->first_time_t = min_first_time_t; - rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T); - } - - if (max_last_time_t != rm->last_time_t) { - rm->last_time_t = max_last_time_t; - rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T); - } - - if(unlikely(!rm->first_time_t && !rm->last_time_t)) - rrd_flag_set_deleted(rm, RRD_FLAG_UPDATE_REASON_ZERO_RETENTION); - - rm->flags |= RRD_FLAG_LIVE_RETENTION; -} - // called when this rrdmetric is inserted to the rrdmetrics dictionary of a rrdinstance +// the constructor of the rrdmetric object static void rrdmetric_insert_callback(const char *id __maybe_unused, void *value, void *data) { RRDMETRIC *rm = value; @@ -469,15 +459,14 @@ static void rrdmetric_insert_callback(const char *id __maybe_unused, void *value rm->ri = data; // remove flags that we need to figure out at runtime - rm->flags = rm->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS; - - rm->created_ut = now_realtime_usec(); + rm->flags = rm->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS; // no need for atomics // signal the react callback to do the job rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_NEW_OBJECT); } // called when this rrdmetric is deleted from the rrdmetrics dictionary of a rrdinstance +// the destructor of the rrdmetric object static void rrdmetric_delete_callback(const char *id __maybe_unused, void *value, void *data __maybe_unused) { RRDMETRIC *rm = value; @@ -488,6 +477,7 @@ static void rrdmetric_delete_callback(const char *id __maybe_unused, void *value } // called when the same rrdmetric is inserted again to the rrdmetrics dictionary of a rrdinstance +// while this is called, the dictionary is write locked, but there may be other users of the object static void rrdmetric_conflict_callback(const char *id __maybe_unused, void *oldv, void *newv, void *data __maybe_unused) { RRDMETRIC *rm = oldv; RRDMETRIC *rm_new = newv; @@ -537,26 +527,27 @@ static void rrdmetric_conflict_callback(const char *id __maybe_unused, void *old rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T); } - rm->flags |= (rm_new->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS); + rrd_flag_set(rm, rm_new->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS); // no needs for atomics on rm_new if(rrd_flag_is_collected(rm) && rrd_flag_is_archived(rm)) rrd_flag_set_collected(rm); - if(rm->flags & RRD_FLAG_UPDATED) - rm->flags |= RRD_FLAG_UPDATE_REASON_UPDATED_OBJECT; + if(rrd_flag_check(rm, RRD_FLAG_UPDATED)) + rrd_flag_set(rm, RRD_FLAG_UPDATE_REASON_UPDATED_OBJECT); rrdmetric_free(rm_new); // the react callback will continue from here } +// this is called after the insert or the conflict callbacks, +// but the dictionary is now unlocked static void rrdmetric_react_callback(const char *id __maybe_unused, void *value, void *data __maybe_unused) { RRDMETRIC *rm = value; - - rrdmetric_trigger_updates(rm, false, true, __FUNCTION__); + rrdmetric_trigger_updates(rm, __FUNCTION__ ); } -static void rrdmetrics_create(RRDINSTANCE *ri) { +static void rrdmetrics_create_in_rrdinstance(RRDINSTANCE *ri) { if(unlikely(!ri)) return; if(likely(ri->rrdmetrics)) return; @@ -567,51 +558,26 @@ static void rrdmetrics_create(RRDINSTANCE *ri) { dictionary_register_react_callback(ri->rrdmetrics, rrdmetric_react_callback, (void *)ri); } -static void rrdmetrics_destroy(RRDINSTANCE *ri) { +static void rrdmetrics_destroy_from_rrdinstance(RRDINSTANCE *ri) { if(unlikely(!ri || !ri->rrdmetrics)) return; dictionary_destroy(ri->rrdmetrics); ri->rrdmetrics = NULL; } -static inline bool rrdmetric_should_be_deleted(RRDMETRIC *rm) { - if(likely(!(rm->flags & RRD_FLAG_DELETED))) - return false; - - if(likely(!(rm->flags & RRD_FLAG_LIVE_RETENTION))) - return false; - - if(unlikely(rm->flags & RRD_FLAGS_PREVENTING_DELETIONS)) - return false; - - if(likely(rm->rrddim)) - return false; - - //if((now_realtime_usec() - rm->created_ut) < 600 * USEC_PER_SEC) - // return false; - - rrdmetric_update_retention(rm); - if(rm->first_time_t || rm->last_time_t) - return false; - - return true; -} - -static void rrdmetric_trigger_updates(RRDMETRIC *rm, bool force, bool escalate, const char *function __maybe_unused) { - if(likely(!force && !(rm->flags & RRD_FLAG_UPDATED))) return; - - // logs and statistics - log_transition(rm, rm->ri, rm->ri->rc, function); - rrdcontext_triggered_update_on_rrdmetric(); - - if(unlikely(rrd_flag_is_collected(rm)) && (!rm->rrddim || rm->flags & RRD_FLAG_UPDATE_REASON_DISCONNECTED_CHILD)) +// trigger post-processing of the rrdmetric, escalating changes to the rrdinstance it belongs +static void rrdmetric_trigger_updates(RRDMETRIC *rm, const char *function) { + if(unlikely(rrd_flag_is_collected(rm)) && (!rm->rrddim || rrd_flag_check(rm, RRD_FLAG_UPDATE_REASON_DISCONNECTED_CHILD))) rrd_flag_set_archived(rm); - rrdmetric_update_retention(rm); - - if(unlikely(escalate && rm->flags & RRD_FLAG_UPDATED)) - rrdinstance_trigger_updates(rm->ri, true, true, __FUNCTION__); + if(rrd_flag_is_updated(rm) || !rrd_flag_check(rm, RRD_FLAG_LIVE_RETENTION)) { + rrd_flag_set_updated(rm->ri, RRD_FLAG_UPDATE_REASON_TRIGGERED); + rrdcontext_queue_for_post_processing(rm->ri->rc, function, rm->flags); + } } +// ---------------------------------------------------------------------------- +// RRDMETRIC HOOKS ON RRDDIM + static inline void rrdmetric_from_rrddim(RRDDIM *rd) { if(unlikely(!rd->rrdset)) fatal("RRDMETRIC: rrddim '%s' does not have a rrdset.", rrddim_id(rd)); @@ -627,7 +593,7 @@ static inline void rrdmetric_from_rrddim(RRDDIM *rd) { RRDMETRIC trm = { .id = string_dup(rd->id), .name = string_dup(rd->name), - .flags = RRD_FLAG_NONE, + .flags = RRD_FLAG_NONE, // no need for atomics .rrddim = rd, }; uuid_copy(trm.uuid, rd->metric_uuid); @@ -648,6 +614,10 @@ static inline RRDMETRIC *rrddim_get_rrdmetric_with_trace(RRDDIM *rd, const char } RRDMETRIC *rm = rrdmetric_acquired_value(rd->rrdmetric); + if(unlikely(!rm)) { + error("RRDMETRIC: RRDDIM '%s' lost the link to its RRDMETRIC at %s()", rrddim_id(rd), function); + return NULL; + } if(unlikely(rm->rrddim != rd)) fatal("RRDMETRIC: '%s' is not linked to RRDDIM '%s' at %s()", string2str(rm->id), rrddim_id(rd), function); @@ -663,7 +633,7 @@ static inline void rrdmetric_rrddim_is_freed(RRDDIM *rd) { rrd_flag_set_archived(rm); rm->rrddim = NULL; - rrdmetric_trigger_updates(rm, false, true, __FUNCTION__); + rrdmetric_trigger_updates(rm, __FUNCTION__ ); rrdmetric_release(rd->rrdmetric); rd->rrdmetric = NULL; } @@ -677,7 +647,7 @@ static inline void rrdmetric_updated_rrddim_flags(RRDDIM *rd) { rrd_flag_set_archived(rm); } - rrdmetric_trigger_updates(rm, false, true, __FUNCTION__); + rrdmetric_trigger_updates(rm, __FUNCTION__ ); } static inline void rrdmetric_collected_rrddim(RRDDIM *rd) { @@ -687,7 +657,10 @@ static inline void rrdmetric_collected_rrddim(RRDDIM *rd) { if(unlikely(!rrd_flag_is_collected(rm))) rrd_flag_set_collected(rm); - rrdmetric_trigger_updates(rm, false, true, __FUNCTION__); + // we use this variable to detect BEGIN/END without SET + rm->ri->internal.collected_metrics++; + + rrdmetric_trigger_updates(rm, __FUNCTION__ ); } // ---------------------------------------------------------------------------- @@ -695,10 +668,10 @@ static inline void rrdmetric_collected_rrddim(RRDDIM *rd) { static void rrdinstance_free(RRDINSTANCE *ri) { - if(ri->flags & RRD_FLAG_OWN_LABELS) + if(rrd_flag_check(ri, RRD_FLAG_OWN_LABELS)) dictionary_destroy(ri->rrdlabels); - rrdmetrics_destroy(ri); + rrdmetrics_destroy_from_rrdinstance(ri); string_freez(ri->id); string_freez(ri->name); string_freez(ri->title); @@ -727,33 +700,32 @@ static void rrdinstance_insert_callback(const char *id __maybe_unused, void *val // link it to its parent ri->rc = data; - ri->flags = ri->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS; + ri->flags = ri->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS; // no need for atomics if(!ri->name) ri->name = string_dup(ri->id); if(ri->rrdset && ri->rrdset->state) { ri->rrdlabels = ri->rrdset->state->chart_labels; - if(ri->flags & RRD_FLAG_OWN_LABELS) - ri->flags &= ~RRD_FLAG_OWN_LABELS; + ri->flags &= ~RRD_FLAG_OWN_LABELS; // no need of atomics at the constructor } else { ri->rrdlabels = rrdlabels_create(); - ri->flags |= RRD_FLAG_OWN_LABELS; + ri->flags |= RRD_FLAG_OWN_LABELS; // no need of atomics at the constructor } if(ri->rrdset) { if(unlikely((rrdset_flag_check(ri->rrdset, RRDSET_FLAG_HIDDEN)) || (ri->rrdset->state && ri->rrdset->state->is_ar_chart))) - ri->flags |= RRD_FLAG_HIDDEN; + ri->flags |= RRD_FLAG_HIDDEN; // no need of atomics at the constructor else - ri->flags &= ~RRD_FLAG_HIDDEN; + ri->flags &= ~RRD_FLAG_HIDDEN; // no need of atomics at the constructor } // we need this when loading from SQL if(unlikely(ri->id == ml_anomaly_rates_id)) - ri->flags |= RRD_FLAG_HIDDEN; + ri->flags |= RRD_FLAG_HIDDEN; // no need of atomics at the constructor - rrdmetrics_create(ri); + rrdmetrics_create_in_rrdinstance(ri); // signal the react callback to do the job rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_NEW_OBJECT); @@ -840,32 +812,32 @@ static void rrdinstance_conflict_callback(const char *id __maybe_unused, void *o if(ri->rrdset != ri_new->rrdset) { ri->rrdset = ri_new->rrdset; - if(ri->rrdset && (ri->flags & RRD_FLAG_OWN_LABELS)) { + if(ri->rrdset && rrd_flag_check(ri, RRD_FLAG_OWN_LABELS)) { DICTIONARY *old = ri->rrdlabels; ri->rrdlabels = ri->rrdset->state->chart_labels; - ri->flags &= ~RRD_FLAG_OWN_LABELS; + rrd_flag_clear(ri, RRD_FLAG_OWN_LABELS); rrdlabels_destroy(old); } - else if(!ri->rrdset && !(ri->flags & RRD_FLAG_OWN_LABELS)) { + else if(!ri->rrdset && !rrd_flag_check(ri, RRD_FLAG_OWN_LABELS)) { ri->rrdlabels = rrdlabels_create(); - ri->flags |= RRD_FLAG_OWN_LABELS; + rrd_flag_set(ri, RRD_FLAG_OWN_LABELS); } } if(ri->rrdset) { if(unlikely((rrdset_flag_check(ri->rrdset, RRDSET_FLAG_HIDDEN)) || (ri->rrdset->state && ri->rrdset->state->is_ar_chart))) - ri->flags |= RRD_FLAG_HIDDEN; + rrd_flag_set(ri, RRD_FLAG_HIDDEN); else - ri->flags &= ~RRD_FLAG_HIDDEN; + rrd_flag_clear(ri, RRD_FLAG_HIDDEN); } - ri->flags |= (ri_new->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS); + rrd_flag_set(ri, ri_new->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS); // no need for atomics on ri_new if(rrd_flag_is_collected(ri) && rrd_flag_is_archived(ri)) rrd_flag_set_collected(ri); - if(ri->flags & RRD_FLAG_UPDATED) - ri->flags |= RRD_FLAG_UPDATE_REASON_UPDATED_OBJECT; + if(rrd_flag_is_updated(ri)) + rrd_flag_set(ri, RRD_FLAG_UPDATE_REASON_UPDATED_OBJECT); // free the new one rrdinstance_free(ri_new); @@ -876,10 +848,10 @@ static void rrdinstance_conflict_callback(const char *id __maybe_unused, void *o static void rrdinstance_react_callback(const char *id __maybe_unused, void *value, void *data __maybe_unused) { RRDINSTANCE *ri = value; - rrdinstance_trigger_updates(ri, false, true, __FUNCTION__); + rrdinstance_trigger_updates(ri, __FUNCTION__ ); } -void rrdinstances_create(RRDCONTEXT *rc) { +void rrdinstances_create_in_rrdcontext(RRDCONTEXT *rc) { if(unlikely(rrdcontext_enabled == CONFIG_BOOLEAN_NO)) return; @@ -892,156 +864,42 @@ void rrdinstances_create(RRDCONTEXT *rc) { dictionary_register_react_callback(rc->rrdinstances, rrdinstance_react_callback, (void *)rc); } -void rrdinstances_destroy(RRDCONTEXT *rc) { +void rrdinstances_destroy_from_rrdcontext(RRDCONTEXT *rc) { if(unlikely(!rc || !rc->rrdinstances)) return; dictionary_destroy(rc->rrdinstances); rc->rrdinstances = NULL; } -static inline bool rrdinstance_should_be_deleted(RRDINSTANCE *ri) { - if(likely(!(ri->flags & RRD_FLAG_DELETED))) - return false; - - if(likely(!(ri->flags & RRD_FLAG_LIVE_RETENTION))) - return false; - - if(unlikely(ri->flags & RRD_FLAGS_PREVENTING_DELETIONS)) - return false; - - if(likely(ri->rrdset)) - return false; - - if(unlikely(dictionary_stats_referenced_items(ri->rrdmetrics) != 0)) - return false; - - if(unlikely(dictionary_stats_entries(ri->rrdmetrics) != 0)) - return false; - - if(ri->first_time_t || ri->last_time_t) - return false; - - return true; -} - -static void rrdinstance_trigger_updates(RRDINSTANCE *ri, bool force, bool escalate, const char *function __maybe_unused) { - if(unlikely(ri->flags & RRD_FLAG_DONT_PROCESS)) return; - if(unlikely(!force && !(ri->flags & RRD_FLAG_UPDATED))) return; +static void rrdinstance_trigger_updates(RRDINSTANCE *ri, const char *function) { + RRDSET *st = ri->rrdset; - // logs and stats - log_transition(NULL, ri, ri->rc, function); - rrdcontext_triggered_update_on_rrdinstance(); - - if(likely(ri->rrdset)) { - if(unlikely(ri->rrdset->priority != ri->priority)) { - ri->priority = ri->rrdset->priority; + if(likely(st)) { + if(unlikely(st->priority != ri->priority)) { + ri->priority = st->priority; rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_PRIORITY); } - if(unlikely(ri->rrdset->update_every != ri->update_every)) { - ri->update_every = ri->rrdset->update_every; + if(unlikely(st->update_every != ri->update_every)) { + ri->update_every = st->update_every; |