summaryrefslogtreecommitdiffstats
path: root/database
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-09-06 19:02:39 +0300
committerGitHub <noreply@github.com>2022-09-06 19:02:39 +0300
commit58c79fd329df7d2187e4aee56fb4a58a9c02c3ae (patch)
treeb48bde3c183e1fa7b164e35a12e5ad87c73e001e /database
parent64a6920038d5f185710154790ff49e4913caac83 (diff)
Faster rrdcontext (#13629)
* moved rrdcontexts processing to worker thread * added loggings * check for aclk deeper in the code * removed unessesary logs * code re-organization; cleanup; more comments; better error handling; rrdcontext locks optimization; more clarity * updated 2 comments * make instances walkthrough reentrant; move context lock to the place is really needed * created macro for reentrant dictionary walkthrough * incremental updates on instances and metrics * renamed family of rrdcontext workers * prevent crash in case RRDINSTANCE or RRDMETRIC is freed during shutdown * prevent crash during rrddim save, on out of memory fatal() * always post-process contexts * added tracing for tracking the caller that trigger updates * more details on tracing info * fix for charts that are collected without metrics
Diffstat (limited to 'database')
-rw-r--r--database/rrd.h3
-rw-r--r--database/rrdcontext.c1948
-rw-r--r--database/rrddim.c8
-rw-r--r--database/rrdvar.c2
4 files changed, 1060 insertions, 901 deletions
diff --git a/database/rrd.h b/database/rrd.h
index 36c4e5d58f..4c3b13dfb1 100644
--- a/database/rrd.h
+++ b/database/rrd.h
@@ -916,7 +916,8 @@ struct rrdhost {
STORAGE_INSTANCE *storage_instance[RRD_STORAGE_TIERS]; // the database instances of the storage tiers
- RRDCONTEXTS *rrdctx_queue;
+ RRDCONTEXTS *rrdctx_hub_queue;
+ RRDCONTEXTS *rrdctx_post_processing_queue;
RRDCONTEXTS *rrdctx;
uuid_t host_uuid; // Global GUID for this host
diff --git a/database/rrdcontext.c b/database/rrdcontext.c
index b3af8b44cc..dcf8b94e22 100644
--- a/database/rrdcontext.c
+++ b/database/rrdcontext.c
@@ -10,10 +10,23 @@ int rrdcontext_enabled = CONFIG_BOOLEAN_YES;
#define MESSAGES_PER_BUNDLE_TO_SEND_TO_HUB_PER_HOST 5000
#define FULL_RETENTION_SCAN_DELAY_AFTER_DB_ROTATION_SECS 120
-#define RRDCONTEXT_WORKER_THREAD_HEARTBEAT_SECS 1
+#define RRDCONTEXT_WORKER_THREAD_HEARTBEAT_USEC (1000 * USEC_PER_MS)
#define RRDCONTEXT_MINIMUM_ALLOWED_PRIORITY 10
-// #define LOG_TRANSITIONS 1
+#define WORKER_JOB_HOSTS 1
+#define WORKER_JOB_CHECK 2
+#define WORKER_JOB_SEND 3
+#define WORKER_JOB_DEQUEUE 4
+#define WORKER_JOB_RETENTION 5
+#define WORKER_JOB_QUEUED 6
+#define WORKER_JOB_CLEANUP 7
+#define WORKER_JOB_CLEANUP_DELETE 8
+#define WORKER_JOB_PP_METRIC 9 // post-processing metrics
+#define WORKER_JOB_PP_INSTANCE 10 // post-processing instances
+#define WORKER_JOB_PP_CONTEXT 11 // post-processing contexts
+#define WORKER_JOB_HUB_QUEUE_SIZE 12
+#define WORKER_JOB_PP_QUEUE_SIZE 13
+
typedef enum {
RRD_FLAG_NONE = 0,
@@ -23,10 +36,11 @@ typedef enum {
RRD_FLAG_ARCHIVED = (1 << 3), // this object is not currently being collected
RRD_FLAG_OWN_LABELS = (1 << 4), // this instance has its own labels - not linked to an RRDSET
RRD_FLAG_LIVE_RETENTION = (1 << 5), // we have got live retention from the database
- RRD_FLAG_QUEUED = (1 << 6), // this context is currently queued to be dispatched to hub
- RRD_FLAG_DONT_PROCESS = (1 << 7), // don't process updates for this object
+ RRD_FLAG_QUEUED_FOR_HUB = (1 << 6), // this context is currently queued to be dispatched to hub
+ RRD_FLAG_QUEUED_FOR_POST_PROCESSING = (1 << 7), // this context is currently queued to be post-processed
RRD_FLAG_HIDDEN = (1 << 8), // don't expose this to the hub or the API
+ RRD_FLAG_UPDATE_REASON_TRIGGERED = (1 << 9), // the update was triggered by the child object
RRD_FLAG_UPDATE_REASON_LOAD_SQL = (1 << 10), // this object has just been loaded from SQL
RRD_FLAG_UPDATE_REASON_NEW_OBJECT = (1 << 11), // this object has just been created
RRD_FLAG_UPDATE_REASON_UPDATED_OBJECT = (1 << 12), // we received an update on this object
@@ -54,7 +68,8 @@ typedef enum {
} RRD_FLAGS;
#define RRD_FLAG_ALL_UPDATE_REASONS ( \
- RRD_FLAG_UPDATE_REASON_LOAD_SQL \
+ RRD_FLAG_UPDATE_REASON_TRIGGERED \
+ |RRD_FLAG_UPDATE_REASON_LOAD_SQL \
|RRD_FLAG_UPDATE_REASON_NEW_OBJECT \
|RRD_FLAG_UPDATE_REASON_UPDATED_OBJECT \
|RRD_FLAG_UPDATE_REASON_CHANGED_LINKING \
@@ -79,51 +94,110 @@ typedef enum {
#define RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS ( \
RRD_FLAG_ARCHIVED \
- |RRD_FLAG_DONT_PROCESS \
|RRD_FLAG_HIDDEN \
|RRD_FLAG_ALL_UPDATE_REASONS \
)
+#define RRD_FLAGS_REQUIRED_FOR_DELETIONS ( \
+ RRD_FLAG_DELETED \
+ |RRD_FLAG_LIVE_RETENTION \
+)
+
#define RRD_FLAGS_PREVENTING_DELETIONS ( \
- RRD_FLAG_QUEUED \
+ RRD_FLAG_QUEUED_FOR_HUB \
|RRD_FLAG_COLLECTED \
+ \
+ /* RRD_FLAG_QUEUED_FOR_POST_PROCESSING */ \
+ /* should not be here or nothing will be deleted */ \
)
-#define rrd_flag_set_updated(obj, reason) (obj)->flags |= (RRD_FLAG_UPDATED | (reason))
-#define rrd_flag_unset_updated(obj) (obj)->flags &= ~(RRD_FLAG_UPDATED | RRD_FLAG_ALL_UPDATE_REASONS)
-
-#define rrd_flag_set_collected(obj) do { \
- if(likely( !((obj)->flags & RRD_FLAG_COLLECTED))) \
- (obj)->flags |= (RRD_FLAG_COLLECTED | RRD_FLAG_UPDATE_REASON_STARTED_BEING_COLLECTED | RRD_FLAG_UPDATED); \
- if(likely( ((obj)->flags & (RRD_FLAG_ARCHIVED | RRD_FLAG_UPDATE_REASON_STOPPED_BEING_COLLECTED)))) \
- (obj)->flags &= ~(RRD_FLAG_ARCHIVED | RRD_FLAG_UPDATE_REASON_STOPPED_BEING_COLLECTED); \
- if(unlikely(((obj)->flags & (RRD_FLAG_DELETED | RRD_FLAG_UPDATE_REASON_ZERO_RETENTION)))) \
- (obj)->flags &= ~(RRD_FLAG_DELETED | RRD_FLAG_UPDATE_REASON_ZERO_RETENTION); \
- if(unlikely(((obj)->flags & RRD_FLAG_DONT_PROCESS))) \
- (obj)->flags &= ~RRD_FLAG_DONT_PROCESS; \
-} while(0)
-
-#define rrd_flag_set_archived(obj) do { \
- if(likely( !((obj)->flags & RRD_FLAG_ARCHIVED))) \
- (obj)->flags |= (RRD_FLAG_ARCHIVED | RRD_FLAG_UPDATE_REASON_STOPPED_BEING_COLLECTED | RRD_FLAG_UPDATED); \
- if(likely( ((obj)->flags & (RRD_FLAG_COLLECTED | RRD_FLAG_UPDATE_REASON_STARTED_BEING_COLLECTED)))) \
- (obj)->flags &= ~(RRD_FLAG_COLLECTED | RRD_FLAG_UPDATE_REASON_STARTED_BEING_COLLECTED); \
- if(unlikely(((obj)->flags & (RRD_FLAG_DELETED | RRD_FLAG_UPDATE_REASON_ZERO_RETENTION)))) \
- (obj)->flags &= ~(RRD_FLAG_DELETED | RRD_FLAG_UPDATE_REASON_ZERO_RETENTION); \
-} while(0)
-
-#define rrd_flag_set_deleted(obj, reason) do { \
- if(likely( !((obj)->flags & RRD_FLAG_DELETED))) \
- (obj)->flags |= (RRD_FLAG_DELETED | RRD_FLAG_UPDATE_REASON_ZERO_RETENTION | RRD_FLAG_UPDATED | (reason)); \
- if(unlikely(((obj)->flags & RRD_FLAG_ARCHIVED))) \
- (obj)->flags &= ~RRD_FLAG_ARCHIVED; \
- if(likely( ((obj)->flags & RRD_FLAG_COLLECTED))) \
- (obj)->flags &= ~RRD_FLAG_COLLECTED; \
-} while(0)
-
-
-#define rrd_flag_is_collected(obj) ((obj)->flags & RRD_FLAG_COLLECTED)
-#define rrd_flag_is_archived(obj) ((obj)->flags & RRD_FLAG_ARCHIVED)
+// get all the flags of an object
+#define rrd_flags_get(obj) __atomic_load_n(&((obj)->flags), __ATOMIC_SEQ_CST)
+
+// check if ANY of the given flags (bits) is set
+#define rrd_flag_check(obj, flag) (rrd_flags_get(obj) & (flag))
+
+// check if ALL of the given flags (bits) are set
+#define rrd_flag_check_all(obj, flag) (rrd_flag_check(obj, flag) == (flag))
+
+// set one or more flags (bits)
+#define rrd_flag_set(obj, flag) __atomic_or_fetch(&((obj)->flags), flag, __ATOMIC_SEQ_CST)
+
+// clear one or more flags (bits)
+#define rrd_flag_clear(obj, flag) __atomic_and_fetch(&((obj)->flags), ~(flag), __ATOMIC_SEQ_CST)
+
+// replace the flags of an object, with the supplied ones
+#define rrd_flags_replace(obj, all_flags) __atomic_store_n(&((obj)->flags), all_flags, __ATOMIC_SEQ_CST)
+
+static inline void
+rrd_flag_add_remove_atomic(RRD_FLAGS *flags, RRD_FLAGS check, RRD_FLAGS conditionally_add, RRD_FLAGS always_remove) {
+ RRD_FLAGS expected, desired;
+ do {
+ expected = *flags;
+
+ desired = expected;
+ desired &= ~(always_remove);
+
+ if(!(expected & check))
+ desired |= (check | conditionally_add);
+
+ } while(!__atomic_compare_exchange_n(flags, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST));
+}
+
+#define rrd_flag_set_collected(obj) \
+ rrd_flag_add_remove_atomic(&((obj)->flags) \
+ /* check this flag */ \
+ , RRD_FLAG_COLLECTED \
+ \
+ /* add these flags together with the above, if the above is not already set */ \
+ , RRD_FLAG_UPDATE_REASON_STARTED_BEING_COLLECTED | RRD_FLAG_UPDATED \
+ \
+ /* always remove these flags */ \
+ , RRD_FLAG_ARCHIVED \
+ | RRD_FLAG_DELETED \
+ | RRD_FLAG_UPDATE_REASON_STOPPED_BEING_COLLECTED \
+ | RRD_FLAG_UPDATE_REASON_ZERO_RETENTION \
+ )
+
+#define rrd_flag_set_archived(obj) \
+ rrd_flag_add_remove_atomic(&((obj)->flags) \
+ /* check this flag */ \
+ , RRD_FLAG_ARCHIVED \
+ \
+ /* add these flags together with the above, if the above is not already set */ \
+ , RRD_FLAG_UPDATE_REASON_STOPPED_BEING_COLLECTED | RRD_FLAG_UPDATED \
+ \
+ /* always remove these flags */ \
+ , RRD_FLAG_COLLECTED \
+ | RRD_FLAG_DELETED \
+ | RRD_FLAG_UPDATE_REASON_STARTED_BEING_COLLECTED \
+ | RRD_FLAG_UPDATE_REASON_ZERO_RETENTION \
+ )
+
+#define rrd_flag_set_deleted(obj, reason) \
+ rrd_flag_add_remove_atomic(&((obj)->flags) \
+ /* check this flag */ \
+ , RRD_FLAG_DELETED \
+ \
+ /* add these flags together with the above, if the above is not already set */ \
+ , RRD_FLAG_UPDATE_REASON_ZERO_RETENTION | RRD_FLAG_UPDATED | (reason) \
+ \
+ /* always remove these flags */ \
+ , RRD_FLAG_ARCHIVED \
+ | RRD_FLAG_DELETED \
+ )
+
+#define rrd_flag_is_collected(obj) rrd_flag_check(obj, RRD_FLAG_COLLECTED)
+#define rrd_flag_is_archived(obj) rrd_flag_check(obj, RRD_FLAG_ARCHIVED)
+#define rrd_flag_is_deleted(obj) rrd_flag_check(obj, RRD_FLAG_DELETED)
+#define rrd_flag_is_updated(obj) rrd_flag_check(obj, RRD_FLAG_UPDATED)
+
+// mark an object as updated, providing reasons (additional bits)
+#define rrd_flag_set_updated(obj, reason) rrd_flag_set(obj, RRD_FLAG_UPDATED | (reason))
+
+// clear an object as being updated, clearing also all the reasons
+#define rrd_flag_unset_updated(obj) rrd_flag_clear(obj, RRD_FLAG_UPDATED | RRD_FLAG_ALL_UPDATE_REASONS)
+
static struct rrdcontext_reason {
RRD_FLAGS flag;
@@ -131,6 +205,7 @@ static struct rrdcontext_reason {
usec_t delay_ut;
} rrdcontext_reasons[] = {
// context related
+ { RRD_FLAG_UPDATE_REASON_TRIGGERED, "triggered transition", 60 * USEC_PER_SEC },
{ RRD_FLAG_UPDATE_REASON_NEW_OBJECT, "object created", 60 * USEC_PER_SEC },
{ RRD_FLAG_UPDATE_REASON_UPDATED_OBJECT, "object updated", 60 * USEC_PER_SEC },
{ RRD_FLAG_UPDATE_REASON_LOAD_SQL, "loaded from sql", 60 * USEC_PER_SEC },
@@ -173,8 +248,6 @@ typedef struct rrdmetric {
RRD_FLAGS flags;
struct rrdinstance *ri;
-
- usec_t created_ut; // the time this object was created
} RRDMETRIC;
typedef struct rrdinstance {
@@ -199,6 +272,12 @@ typedef struct rrdinstance {
struct rrdcontext *rc;
DICTIONARY *rrdmetrics;
+
+ struct {
+ uint32_t collected_metrics; // a temporary variable to detect BEGIN/END without SET
+ // don't use it for other purposes
+ // it goes up and then resets to zero, on every iteration
+ } internal;
} RRDINSTANCE;
typedef struct rrdcontext {
@@ -287,31 +366,38 @@ static inline void rrdcontext_release(RRDCONTEXT_ACQUIRED *rca) {
dictionary_acquired_item_release((DICTIONARY *)rc->rrdhost->rrdctx, (DICTIONARY_ITEM *)rca);
}
-static void rrdcontext_recalculate_context_retention(RRDCONTEXT *rc, RRD_FLAGS reason, int job_id);
-static void rrdcontext_recalculate_host_retention(RRDHOST *host, RRD_FLAGS reason, int job_id);
+static void rrdcontext_recalculate_context_retention(RRDCONTEXT *rc, RRD_FLAGS reason, bool worker_jobs);
+static void rrdcontext_recalculate_host_retention(RRDHOST *host, RRD_FLAGS reason, bool worker_jobs);
#define rrdcontext_version_hash(host) rrdcontext_version_hash_with_callback(host, NULL, false, NULL)
static uint64_t rrdcontext_version_hash_with_callback(RRDHOST *host, void (*callback)(RRDCONTEXT *, bool, void *), bool snapshot, void *bundle);
-static void rrdcontext_garbage_collect_single_host(RRDHOST *host, bool worker);
-static void rrdcontext_garbage_collect(void);
+static void rrdcontext_garbage_collect_single_host(RRDHOST *host, bool worker_jobs);
+static void rrdcontext_garbage_collect_for_all_hosts(void);
void rrdcontext_delete_from_sql_unsafe(RRDCONTEXT *rc);
#define rrdcontext_lock(rc) netdata_mutex_lock(&((rc)->mutex))
#define rrdcontext_unlock(rc) netdata_mutex_unlock(&((rc)->mutex))
// ----------------------------------------------------------------------------
-// Updates triggers
+// Forward definitions
+
+static uint64_t rrdcontext_get_next_version(RRDCONTEXT *rc);
+static bool check_if_cloud_version_changed_unsafe(RRDCONTEXT *rc, bool sending __maybe_unused);
+static void rrdcontext_message_send_unsafe(RRDCONTEXT *rc, bool snapshot __maybe_unused, void *bundle __maybe_unused);
-static void rrdmetric_trigger_updates(RRDMETRIC *rm, bool force, bool escalate, const char *function);
-static void rrdinstance_trigger_updates(RRDINSTANCE *ri, bool force, bool escalate, const char *function);
-static void rrdcontext_trigger_updates(RRDCONTEXT *rc, bool force, const char *function);
+static void rrdcontext_queue_for_post_processing(RRDCONTEXT *rc, const char *function, RRD_FLAGS flags);
+static void rrdcontext_post_process_updates(RRDCONTEXT *rc, bool force, RRD_FLAGS reason, bool worker_jobs);
+
+static void rrdmetric_trigger_updates(RRDMETRIC *rm, const char *function);
+static void rrdinstance_trigger_updates(RRDINSTANCE *ri, const char *function);
+static void rrdcontext_trigger_updates(RRDCONTEXT *rc, const char *function);
// ----------------------------------------------------------------------------
// visualizing flags
static void rrd_flags_to_buffer(RRD_FLAGS flags, BUFFER *wb) {
- if(flags & RRD_FLAG_QUEUED)
+ if(flags & RRD_FLAG_QUEUED_FOR_HUB)
buffer_strcat(wb, "QUEUED ");
if(flags & RRD_FLAG_DELETED)
@@ -332,11 +418,11 @@ static void rrd_flags_to_buffer(RRD_FLAGS flags, BUFFER *wb) {
if(flags & RRD_FLAG_LIVE_RETENTION)
buffer_strcat(wb, "LIVE_RETENTION ");
- if(flags & RRD_FLAG_DONT_PROCESS)
- buffer_strcat(wb, "DONT_PROCESS ");
-
if(flags & RRD_FLAG_HIDDEN)
buffer_strcat(wb, "HIDDEN ");
+
+ if(flags & RRD_FLAG_QUEUED_FOR_POST_PROCESSING)
+ buffer_strcat(wb, "PENDING_UPDATES ");
}
static void rrd_reasons_to_buffer(RRD_FLAGS flags, BUFFER *wb) {
@@ -351,54 +437,10 @@ static void rrd_reasons_to_buffer(RRD_FLAGS flags, BUFFER *wb) {
}
// ----------------------------------------------------------------------------
-// logging of all data collected
-
-#ifdef LOG_TRANSITIONS
-static void log_transition(RRDMETRIC *rm, RRDINSTANCE *ri, RRDCONTEXT *rc, const char *function) {
- BUFFER *wb = buffer_create(1000);
- const char *triggered_on = "triggered on ";
-
- buffer_sprintf(wb, "RRD TRANSITION: %s() ", function);
-
- if(rm) {
- buffer_sprintf(wb, "%smetric '%s' of ", triggered_on, string2str(rm->id));
- triggered_on = "";
- }
-
- if(ri) {
- buffer_sprintf(wb, "%sinstance '%s' of ", triggered_on, string2str(ri->id));
- triggered_on = "";
- }
-
- buffer_sprintf(wb, "%scontext '%s' ", triggered_on, string2str(rc->id));
-
- RRD_FLAGS flags = rc->flags;
- const char *we_are = "context";
- if(ri) {
- flags = ri->flags;
- we_are = "instance";
- }
- if(rm) {
- flags = rm->flags;
- we_are = "metric";
- }
-
- buffer_sprintf(wb, "%s flags: ", we_are);
- rrd_flags_to_buffer(flags, wb);
-
- buffer_strcat(wb, ", having reasons: ");
- rrd_reasons_to_buffer(flags, wb);
-
- internal_error(true, "%s", buffer_tostring(wb));
- buffer_free(wb);
-}
-#else
-#define log_transition(rm, ri, rc, function) debug_dummy()
-#endif
-
-// ----------------------------------------------------------------------------
// RRDMETRIC
+// free the contents of RRDMETRIC.
+// RRDMETRIC itself is managed by DICTIONARY - no need to free it here.
static void rrdmetric_free(RRDMETRIC *rm) {
string_freez(rm->id);
string_freez(rm->name);
@@ -408,60 +450,8 @@ static void rrdmetric_free(RRDMETRIC *rm) {
rm->ri = NULL;
}
-static void rrdmetric_update_retention(RRDMETRIC *rm) {
- time_t min_first_time_t = LONG_MAX, max_last_time_t = 0;
-
- if(rm->rrddim) {
- min_first_time_t = rrddim_first_entry_t(rm->rrddim);
- max_last_time_t = rrddim_last_entry_t(rm->rrddim);
- }
-#ifdef ENABLE_DBENGINE
- else {
- RRDHOST *rrdhost = rm->ri->rc->rrdhost;
- for (int tier = 0; tier < storage_tiers; tier++) {
- if(!rrdhost->storage_instance[tier]) continue;
-
- time_t first_time_t, last_time_t;
- if (rrdeng_metric_retention_by_uuid(rrdhost->storage_instance[tier], &rm->uuid, &first_time_t, &last_time_t) == 0) {
- if (first_time_t < min_first_time_t)
- min_first_time_t = first_time_t;
-
- if (last_time_t > max_last_time_t)
- max_last_time_t = last_time_t;
- }
- }
- }
-#endif
-
- if(min_first_time_t == LONG_MAX)
- min_first_time_t = 0;
-
- if(min_first_time_t > max_last_time_t) {
- internal_error(true, "RRDMETRIC: retention of '%s' is flipped", string2str(rm->id));
- time_t tmp = min_first_time_t;
- min_first_time_t = max_last_time_t;
- max_last_time_t = tmp;
- }
-
- // check if retention changed
-
- if (min_first_time_t != rm->first_time_t) {
- rm->first_time_t = min_first_time_t;
- rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T);
- }
-
- if (max_last_time_t != rm->last_time_t) {
- rm->last_time_t = max_last_time_t;
- rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T);
- }
-
- if(unlikely(!rm->first_time_t && !rm->last_time_t))
- rrd_flag_set_deleted(rm, RRD_FLAG_UPDATE_REASON_ZERO_RETENTION);
-
- rm->flags |= RRD_FLAG_LIVE_RETENTION;
-}
-
// called when this rrdmetric is inserted to the rrdmetrics dictionary of a rrdinstance
+// the constructor of the rrdmetric object
static void rrdmetric_insert_callback(const char *id __maybe_unused, void *value, void *data) {
RRDMETRIC *rm = value;
@@ -469,15 +459,14 @@ static void rrdmetric_insert_callback(const char *id __maybe_unused, void *value
rm->ri = data;
// remove flags that we need to figure out at runtime
- rm->flags = rm->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS;
-
- rm->created_ut = now_realtime_usec();
+ rm->flags = rm->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS; // no need for atomics
// signal the react callback to do the job
rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_NEW_OBJECT);
}
// called when this rrdmetric is deleted from the rrdmetrics dictionary of a rrdinstance
+// the destructor of the rrdmetric object
static void rrdmetric_delete_callback(const char *id __maybe_unused, void *value, void *data __maybe_unused) {
RRDMETRIC *rm = value;
@@ -488,6 +477,7 @@ static void rrdmetric_delete_callback(const char *id __maybe_unused, void *value
}
// called when the same rrdmetric is inserted again to the rrdmetrics dictionary of a rrdinstance
+// while this is called, the dictionary is write locked, but there may be other users of the object
static void rrdmetric_conflict_callback(const char *id __maybe_unused, void *oldv, void *newv, void *data __maybe_unused) {
RRDMETRIC *rm = oldv;
RRDMETRIC *rm_new = newv;
@@ -537,26 +527,27 @@ static void rrdmetric_conflict_callback(const char *id __maybe_unused, void *old
rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T);
}
- rm->flags |= (rm_new->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS);
+ rrd_flag_set(rm, rm_new->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS); // no needs for atomics on rm_new
if(rrd_flag_is_collected(rm) && rrd_flag_is_archived(rm))
rrd_flag_set_collected(rm);
- if(rm->flags & RRD_FLAG_UPDATED)
- rm->flags |= RRD_FLAG_UPDATE_REASON_UPDATED_OBJECT;
+ if(rrd_flag_check(rm, RRD_FLAG_UPDATED))
+ rrd_flag_set(rm, RRD_FLAG_UPDATE_REASON_UPDATED_OBJECT);
rrdmetric_free(rm_new);
// the react callback will continue from here
}
+// this is called after the insert or the conflict callbacks,
+// but the dictionary is now unlocked
static void rrdmetric_react_callback(const char *id __maybe_unused, void *value, void *data __maybe_unused) {
RRDMETRIC *rm = value;
-
- rrdmetric_trigger_updates(rm, false, true, __FUNCTION__);
+ rrdmetric_trigger_updates(rm, __FUNCTION__ );
}
-static void rrdmetrics_create(RRDINSTANCE *ri) {
+static void rrdmetrics_create_in_rrdinstance(RRDINSTANCE *ri) {
if(unlikely(!ri)) return;
if(likely(ri->rrdmetrics)) return;
@@ -567,51 +558,26 @@ static void rrdmetrics_create(RRDINSTANCE *ri) {
dictionary_register_react_callback(ri->rrdmetrics, rrdmetric_react_callback, (void *)ri);
}
-static void rrdmetrics_destroy(RRDINSTANCE *ri) {
+static void rrdmetrics_destroy_from_rrdinstance(RRDINSTANCE *ri) {
if(unlikely(!ri || !ri->rrdmetrics)) return;
dictionary_destroy(ri->rrdmetrics);
ri->rrdmetrics = NULL;
}
-static inline bool rrdmetric_should_be_deleted(RRDMETRIC *rm) {
- if(likely(!(rm->flags & RRD_FLAG_DELETED)))
- return false;
-
- if(likely(!(rm->flags & RRD_FLAG_LIVE_RETENTION)))
- return false;
-
- if(unlikely(rm->flags & RRD_FLAGS_PREVENTING_DELETIONS))
- return false;
-
- if(likely(rm->rrddim))
- return false;
-
- //if((now_realtime_usec() - rm->created_ut) < 600 * USEC_PER_SEC)
- // return false;
-
- rrdmetric_update_retention(rm);
- if(rm->first_time_t || rm->last_time_t)
- return false;
-
- return true;
-}
-
-static void rrdmetric_trigger_updates(RRDMETRIC *rm, bool force, bool escalate, const char *function __maybe_unused) {
- if(likely(!force && !(rm->flags & RRD_FLAG_UPDATED))) return;
-
- // logs and statistics
- log_transition(rm, rm->ri, rm->ri->rc, function);
- rrdcontext_triggered_update_on_rrdmetric();
-
- if(unlikely(rrd_flag_is_collected(rm)) && (!rm->rrddim || rm->flags & RRD_FLAG_UPDATE_REASON_DISCONNECTED_CHILD))
+// trigger post-processing of the rrdmetric, escalating changes to the rrdinstance it belongs
+static void rrdmetric_trigger_updates(RRDMETRIC *rm, const char *function) {
+ if(unlikely(rrd_flag_is_collected(rm)) && (!rm->rrddim || rrd_flag_check(rm, RRD_FLAG_UPDATE_REASON_DISCONNECTED_CHILD)))
rrd_flag_set_archived(rm);
- rrdmetric_update_retention(rm);
-
- if(unlikely(escalate && rm->flags & RRD_FLAG_UPDATED))
- rrdinstance_trigger_updates(rm->ri, true, true, __FUNCTION__);
+ if(rrd_flag_is_updated(rm) || !rrd_flag_check(rm, RRD_FLAG_LIVE_RETENTION)) {
+ rrd_flag_set_updated(rm->ri, RRD_FLAG_UPDATE_REASON_TRIGGERED);
+ rrdcontext_queue_for_post_processing(rm->ri->rc, function, rm->flags);
+ }
}
+// ----------------------------------------------------------------------------
+// RRDMETRIC HOOKS ON RRDDIM
+
static inline void rrdmetric_from_rrddim(RRDDIM *rd) {
if(unlikely(!rd->rrdset))
fatal("RRDMETRIC: rrddim '%s' does not have a rrdset.", rrddim_id(rd));
@@ -627,7 +593,7 @@ static inline void rrdmetric_from_rrddim(RRDDIM *rd) {
RRDMETRIC trm = {
.id = string_dup(rd->id),
.name = string_dup(rd->name),
- .flags = RRD_FLAG_NONE,
+ .flags = RRD_FLAG_NONE, // no need for atomics
.rrddim = rd,
};
uuid_copy(trm.uuid, rd->metric_uuid);
@@ -648,6 +614,10 @@ static inline RRDMETRIC *rrddim_get_rrdmetric_with_trace(RRDDIM *rd, const char
}
RRDMETRIC *rm = rrdmetric_acquired_value(rd->rrdmetric);
+ if(unlikely(!rm)) {
+ error("RRDMETRIC: RRDDIM '%s' lost the link to its RRDMETRIC at %s()", rrddim_id(rd), function);
+ return NULL;
+ }
if(unlikely(rm->rrddim != rd))
fatal("RRDMETRIC: '%s' is not linked to RRDDIM '%s' at %s()", string2str(rm->id), rrddim_id(rd), function);
@@ -663,7 +633,7 @@ static inline void rrdmetric_rrddim_is_freed(RRDDIM *rd) {
rrd_flag_set_archived(rm);
rm->rrddim = NULL;
- rrdmetric_trigger_updates(rm, false, true, __FUNCTION__);
+ rrdmetric_trigger_updates(rm, __FUNCTION__ );
rrdmetric_release(rd->rrdmetric);
rd->rrdmetric = NULL;
}
@@ -677,7 +647,7 @@ static inline void rrdmetric_updated_rrddim_flags(RRDDIM *rd) {
rrd_flag_set_archived(rm);
}
- rrdmetric_trigger_updates(rm, false, true, __FUNCTION__);
+ rrdmetric_trigger_updates(rm, __FUNCTION__ );
}
static inline void rrdmetric_collected_rrddim(RRDDIM *rd) {
@@ -687,7 +657,10 @@ static inline void rrdmetric_collected_rrddim(RRDDIM *rd) {
if(unlikely(!rrd_flag_is_collected(rm)))
rrd_flag_set_collected(rm);
- rrdmetric_trigger_updates(rm, false, true, __FUNCTION__);
+ // we use this variable to detect BEGIN/END without SET
+ rm->ri->internal.collected_metrics++;
+
+ rrdmetric_trigger_updates(rm, __FUNCTION__ );
}
// ----------------------------------------------------------------------------
@@ -695,10 +668,10 @@ static inline void rrdmetric_collected_rrddim(RRDDIM *rd) {
static void rrdinstance_free(RRDINSTANCE *ri) {
- if(ri->flags & RRD_FLAG_OWN_LABELS)
+ if(rrd_flag_check(ri, RRD_FLAG_OWN_LABELS))
dictionary_destroy(ri->rrdlabels);
- rrdmetrics_destroy(ri);
+ rrdmetrics_destroy_from_rrdinstance(ri);
string_freez(ri->id);
string_freez(ri->name);
string_freez(ri->title);
@@ -727,33 +700,32 @@ static void rrdinstance_insert_callback(const char *id __maybe_unused, void *val
// link it to its parent
ri->rc = data;
- ri->flags = ri->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS;
+ ri->flags = ri->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS; // no need for atomics
if(!ri->name)
ri->name = string_dup(ri->id);
if(ri->rrdset && ri->rrdset->state) {
ri->rrdlabels = ri->rrdset->state->chart_labels;
- if(ri->flags & RRD_FLAG_OWN_LABELS)
- ri->flags &= ~RRD_FLAG_OWN_LABELS;
+ ri->flags &= ~RRD_FLAG_OWN_LABELS; // no need of atomics at the constructor
}
else {
ri->rrdlabels = rrdlabels_create();
- ri->flags |= RRD_FLAG_OWN_LABELS;
+ ri->flags |= RRD_FLAG_OWN_LABELS; // no need of atomics at the constructor
}
if(ri->rrdset) {
if(unlikely((rrdset_flag_check(ri->rrdset, RRDSET_FLAG_HIDDEN)) || (ri->rrdset->state && ri->rrdset->state->is_ar_chart)))
- ri->flags |= RRD_FLAG_HIDDEN;
+ ri->flags |= RRD_FLAG_HIDDEN; // no need of atomics at the constructor
else
- ri->flags &= ~RRD_FLAG_HIDDEN;
+ ri->flags &= ~RRD_FLAG_HIDDEN; // no need of atomics at the constructor
}
// we need this when loading from SQL
if(unlikely(ri->id == ml_anomaly_rates_id))
- ri->flags |= RRD_FLAG_HIDDEN;
+ ri->flags |= RRD_FLAG_HIDDEN; // no need of atomics at the constructor
- rrdmetrics_create(ri);
+ rrdmetrics_create_in_rrdinstance(ri);
// signal the react callback to do the job
rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_NEW_OBJECT);
@@ -840,32 +812,32 @@ static void rrdinstance_conflict_callback(const char *id __maybe_unused, void *o
if(ri->rrdset != ri_new->rrdset) {
ri->rrdset = ri_new->rrdset;
- if(ri->rrdset && (ri->flags & RRD_FLAG_OWN_LABELS)) {
+ if(ri->rrdset && rrd_flag_check(ri, RRD_FLAG_OWN_LABELS)) {
DICTIONARY *old = ri->rrdlabels;
ri->rrdlabels = ri->rrdset->state->chart_labels;
- ri->flags &= ~RRD_FLAG_OWN_LABELS;
+ rrd_flag_clear(ri, RRD_FLAG_OWN_LABELS);
rrdlabels_destroy(old);
}
- else if(!ri->rrdset && !(ri->flags & RRD_FLAG_OWN_LABELS)) {
+ else if(!ri->rrdset && !rrd_flag_check(ri, RRD_FLAG_OWN_LABELS)) {
ri->rrdlabels = rrdlabels_create();
- ri->flags |= RRD_FLAG_OWN_LABELS;
+ rrd_flag_set(ri, RRD_FLAG_OWN_LABELS);
}
}
if(ri->rrdset) {
if(unlikely((rrdset_flag_check(ri->rrdset, RRDSET_FLAG_HIDDEN)) || (ri->rrdset->state && ri->rrdset->state->is_ar_chart)))
- ri->flags |= RRD_FLAG_HIDDEN;
+ rrd_flag_set(ri, RRD_FLAG_HIDDEN);
else
- ri->flags &= ~RRD_FLAG_HIDDEN;
+ rrd_flag_clear(ri, RRD_FLAG_HIDDEN);
}
- ri->flags |= (ri_new->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS);
+ rrd_flag_set(ri, ri_new->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS); // no need for atomics on ri_new
if(rrd_flag_is_collected(ri) && rrd_flag_is_archived(ri))
rrd_flag_set_collected(ri);
- if(ri->flags & RRD_FLAG_UPDATED)
- ri->flags |= RRD_FLAG_UPDATE_REASON_UPDATED_OBJECT;
+ if(rrd_flag_is_updated(ri))
+ rrd_flag_set(ri, RRD_FLAG_UPDATE_REASON_UPDATED_OBJECT);
// free the new one
rrdinstance_free(ri_new);
@@ -876,10 +848,10 @@ static void rrdinstance_conflict_callback(const char *id __maybe_unused, void *o
static void rrdinstance_react_callback(const char *id __maybe_unused, void *value, void *data __maybe_unused) {
RRDINSTANCE *ri = value;
- rrdinstance_trigger_updates(ri, false, true, __FUNCTION__);
+ rrdinstance_trigger_updates(ri, __FUNCTION__ );
}
-void rrdinstances_create(RRDCONTEXT *rc) {
+void rrdinstances_create_in_rrdcontext(RRDCONTEXT *rc) {
if(unlikely(rrdcontext_enabled == CONFIG_BOOLEAN_NO))
return;
@@ -892,156 +864,42 @@ void rrdinstances_create(RRDCONTEXT *rc) {
dictionary_register_react_callback(rc->rrdinstances, rrdinstance_react_callback, (void *)rc);
}
-void rrdinstances_destroy(RRDCONTEXT *rc) {
+void rrdinstances_destroy_from_rrdcontext(RRDCONTEXT *rc) {
if(unlikely(!rc || !rc->rrdinstances)) return;
dictionary_destroy(rc->rrdinstances);
rc->rrdinstances = NULL;
}
-static inline bool rrdinstance_should_be_deleted(RRDINSTANCE *ri) {
- if(likely(!(ri->flags & RRD_FLAG_DELETED)))
- return false;
-
- if(likely(!(ri->flags & RRD_FLAG_LIVE_RETENTION)))
- return false;
-
- if(unlikely(ri->flags & RRD_FLAGS_PREVENTING_DELETIONS))
- return false;
-
- if(likely(ri->rrdset))
- return false;
-
- if(unlikely(dictionary_stats_referenced_items(ri->rrdmetrics) != 0))
- return false;
-
- if(unlikely(dictionary_stats_entries(ri->rrdmetrics) != 0))
- return false;
-
- if(ri->first_time_t || ri->last_time_t)
- return false;
-
- return true;
-}
-
-static void rrdinstance_trigger_updates(RRDINSTANCE *ri, bool force, bool escalate, const char *function __maybe_unused) {
- if(unlikely(ri->flags & RRD_FLAG_DONT_PROCESS)) return;
- if(unlikely(!force && !(ri->flags & RRD_FLAG_UPDATED))) return;
+static void rrdinstance_trigger_updates(RRDINSTANCE *ri, const char *function) {
+ RRDSET *st = ri->rrdset;
- // logs and stats
- log_transition(NULL, ri, ri->rc, function);
- rrdcontext_triggered_update_on_rrdinstance();
-
- if(likely(ri->rrdset)) {
- if(unlikely(ri->rrdset->priority != ri->priority)) {
- ri->priority = ri->rrdset->priority;
+ if(likely(st)) {
+ if(unlikely(st->priority != ri->priority)) {
+ ri->priority = st->priority;
rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_PRIORITY);
}
- if(unlikely(ri->rrdset->update_every != ri->update_every)) {
- ri->update_every = ri->rrdset->update_every;
+ if(unlikely(st->update_every != ri->update_every)) {
+ ri->update_every = st->update_every;