summaryrefslogtreecommitdiffstats
path: root/database/rrdcontext.c
diff options
context:
space:
mode:
Diffstat (limited to 'database/rrdcontext.c')
-rw-r--r--database/rrdcontext.c188
1 files changed, 108 insertions, 80 deletions
diff --git a/database/rrdcontext.c b/database/rrdcontext.c
index 4a77fea1b4..d4f5d1d848 100644
--- a/database/rrdcontext.c
+++ b/database/rrdcontext.c
@@ -8,6 +8,8 @@
int rrdcontext_enabled = CONFIG_BOOLEAN_YES;
+// #define LOG_POST_PROCESSING_QUEUE_INSERTIONS 1
+
#define MESSAGES_PER_BUNDLE_TO_SEND_TO_HUB_PER_HOST 5000
#define FULL_RETENTION_SCAN_DELAY_AFTER_DB_ROTATION_SECS 120
#define RRDCONTEXT_WORKER_THREAD_HEARTBEAT_USEC (1000 * USEC_PER_MS)
@@ -432,11 +434,11 @@ static void rrdmetric_free(RRDMETRIC *rm) {
// called when this rrdmetric is inserted to the rrdmetrics dictionary of a rrdinstance
// the constructor of the rrdmetric object
-static void rrdmetric_insert_callback(const char *id __maybe_unused, void *value, void *data) {
+static void rrdmetric_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *rrdinstance) {
RRDMETRIC *rm = value;
// link it to its parent
- rm->ri = data;
+ rm->ri = rrdinstance;
// remove flags that we need to figure out at runtime
rm->flags = rm->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS; // no need for atomics
@@ -447,7 +449,7 @@ static void rrdmetric_insert_callback(const char *id __maybe_unused, void *value
// called when this rrdmetric is deleted from the rrdmetrics dictionary of a rrdinstance
// the destructor of the rrdmetric object
-static void rrdmetric_delete_callback(const char *id __maybe_unused, void *value, void *data __maybe_unused) {
+static void rrdmetric_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *rrdinstance __maybe_unused) {
RRDMETRIC *rm = value;
internal_error(rm->rrddim, "RRDMETRIC: '%s' is freed but there is a RRDDIM linked to it.", string2str(rm->id));
@@ -458,7 +460,7 @@ static void rrdmetric_delete_callback(const char *id __maybe_unused, void *value
// called when the same rrdmetric is inserted again to the rrdmetrics dictionary of a rrdinstance
// while this is called, the dictionary is write locked, but there may be other users of the object
-static void rrdmetric_conflict_callback(const char *id __maybe_unused, void *oldv, void *newv, void *data __maybe_unused) {
+static bool rrdmetric_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *oldv, void *newv, void *rrdinstance __maybe_unused) {
RRDMETRIC *rm = oldv;
RRDMETRIC *rm_new = newv;
@@ -518,11 +520,12 @@ static void rrdmetric_conflict_callback(const char *id __maybe_unused, void *old
rrdmetric_free(rm_new);
// the react callback will continue from here
+ return rrd_flag_is_updated(rm);
}
// this is called after the insert or the conflict callbacks,
// but the dictionary is now unlocked
-static void rrdmetric_react_callback(const char *id __maybe_unused, void *value, void *data __maybe_unused) {
+static void rrdmetric_react_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *rrdinstance __maybe_unused) {
RRDMETRIC *rm = value;
rrdmetric_trigger_updates(rm, __FUNCTION__ );
}
@@ -531,11 +534,11 @@ static void rrdmetrics_create_in_rrdinstance(RRDINSTANCE *ri) {
if(unlikely(!ri)) return;
if(likely(ri->rrdmetrics)) return;
- ri->rrdmetrics = dictionary_create(DICTIONARY_FLAG_DONT_OVERWRITE_VALUE);
- dictionary_register_insert_callback(ri->rrdmetrics, rrdmetric_insert_callback, (void *)ri);
- dictionary_register_delete_callback(ri->rrdmetrics, rrdmetric_delete_callback, (void *)ri);
- dictionary_register_conflict_callback(ri->rrdmetrics, rrdmetric_conflict_callback, (void *)ri);
- dictionary_register_react_callback(ri->rrdmetrics, rrdmetric_react_callback, (void *)ri);
+ ri->rrdmetrics = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE);
+ dictionary_register_insert_callback(ri->rrdmetrics, rrdmetric_insert_callback, ri);
+ dictionary_register_delete_callback(ri->rrdmetrics, rrdmetric_delete_callback, ri);
+ dictionary_register_conflict_callback(ri->rrdmetrics, rrdmetric_conflict_callback, ri);
+ dictionary_register_react_callback(ri->rrdmetrics, rrdmetric_react_callback, ri);
}
static void rrdmetrics_destroy_from_rrdinstance(RRDINSTANCE *ri) {
@@ -669,7 +672,7 @@ static void rrdinstance_free(RRDINSTANCE *ri) {
ri->rrdset = NULL;
}
-static void rrdinstance_insert_callback(const char *id __maybe_unused, void *value, void *data) {
+static void rrdinstance_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *rrdcontext) {
static STRING *ml_anomaly_rates_id = NULL;
if(unlikely(!ml_anomaly_rates_id))
@@ -678,7 +681,7 @@ static void rrdinstance_insert_callback(const char *id __maybe_unused, void *val
RRDINSTANCE *ri = value;
// link it to its parent
- ri->rc = data;
+ ri->rc = rrdcontext;
ri->flags = ri->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS; // no need for atomics
@@ -711,9 +714,7 @@ static void rrdinstance_insert_callback(const char *id __maybe_unused, void *val
rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_NEW_OBJECT);
}
-static void rrdinstance_delete_callback(const char *id, void *value, void *data) {
- (void)id;
- RRDCONTEXT *rc = data; (void)rc;
+static void rrdinstance_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *rrdcontext __maybe_unused) {
RRDINSTANCE *ri = (RRDINSTANCE *)value;
internal_error(ri->rrdset, "RRDINSTANCE: '%s' is freed but there is a RRDSET linked to it.", string2str(ri->id));
@@ -721,7 +722,7 @@ static void rrdinstance_delete_callback(const char *id, void *value, void *data)
rrdinstance_free(ri);
}
-static void rrdinstance_conflict_callback(const char *id __maybe_unused, void *oldv, void *newv, void *data __maybe_unused) {
+static bool rrdinstance_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *oldv, void *newv, void *rrdcontext __maybe_unused) {
RRDINSTANCE *ri = (RRDINSTANCE *)oldv;
RRDINSTANCE *ri_new = (RRDINSTANCE *)newv;
@@ -739,10 +740,10 @@ static void rrdinstance_conflict_callback(const char *id __maybe_unused, void *o
rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_LINKING);
}
- if(ri->rrdset && ri->rrdset->chart_uuid && uuid_compare(ri->uuid, *ri->rrdset->chart_uuid) != 0) {
+ if(ri->rrdset && uuid_compare(ri->uuid, ri->rrdset->chart_uuid) != 0) {
char uuid1[UUID_STR_LEN], uuid2[UUID_STR_LEN];
uuid_unparse(ri->uuid, uuid1);
- uuid_unparse(*ri->rrdset->chart_uuid, uuid2);
+ uuid_unparse(ri->rrdset->chart_uuid, uuid2);
internal_error(true, "RRDINSTANCE: '%s' is linked to RRDSET '%s' but they have different UUIDs. RRDINSTANCE has '%s', RRDSET has '%s'", string2str(ri->id), rrdset_id(ri->rrdset), uuid1, uuid2);
}
@@ -823,9 +824,10 @@ static void rrdinstance_conflict_callback(const char *id __maybe_unused, void *o
rrdinstance_free(ri_new);
// the react callback will continue from here
+ return rrd_flag_is_updated(ri);
}
-static void rrdinstance_react_callback(const char *id __maybe_unused, void *value, void *data __maybe_unused) {
+static void rrdinstance_react_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *rrdcontext __maybe_unused) {
RRDINSTANCE *ri = value;
rrdinstance_trigger_updates(ri, __FUNCTION__ );
@@ -837,11 +839,11 @@ void rrdinstances_create_in_rrdcontext(RRDCONTEXT *rc) {
if(unlikely(!rc || rc->rrdinstances)) return;
- rc->rrdinstances = dictionary_create(DICTIONARY_FLAG_DONT_OVERWRITE_VALUE);
- dictionary_register_insert_callback(rc->rrdinstances, rrdinstance_insert_callback, (void *)rc);
- dictionary_register_delete_callback(rc->rrdinstances, rrdinstance_delete_callback, (void *)rc);
- dictionary_register_conflict_callback(rc->rrdinstances, rrdinstance_conflict_callback, (void *)rc);
- dictionary_register_react_callback(rc->rrdinstances, rrdinstance_react_callback, (void *)rc);
+ rc->rrdinstances = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE);
+ dictionary_register_insert_callback(rc->rrdinstances, rrdinstance_insert_callback, rc);
+ dictionary_register_delete_callback(rc->rrdinstances, rrdinstance_delete_callback, rc);
+ dictionary_register_conflict_callback(rc->rrdinstances, rrdinstance_conflict_callback, rc);
+ dictionary_register_react_callback(rc->rrdinstances, rrdinstance_react_callback, rc);
}
void rrdinstances_destroy_from_rrdcontext(RRDCONTEXT *rc) {
@@ -907,7 +909,7 @@ static inline void rrdinstance_from_rrdset(RRDSET *st) {
.flags = RRD_FLAG_NONE, // no need for atomics
.rrdset = st,
};
- uuid_copy(tri.uuid, *st->chart_uuid);
+ uuid_copy(tri.uuid, st->chart_uuid);
RRDINSTANCE_ACQUIRED *ria = (RRDINSTANCE_ACQUIRED *)dictionary_set_and_acquire_item(rc->rrdinstances, string2str(tri.id), &tri, sizeof(tri));
@@ -934,7 +936,6 @@ static inline void rrdinstance_from_rrdset(RRDSET *st) {
RRDINSTANCE *ri_old = rrdinstance_acquired_value(ria_old);
// migrate all dimensions to the new metrics
- rrdset_rdlock(st);
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
if (!rd->rrdmetric) continue;
@@ -950,7 +951,7 @@ static inline void rrdinstance_from_rrdset(RRDSET *st) {
rrdmetric_from_rrddim(rd);
}
- rrdset_unlock(st);
+ rrddim_foreach_done(rd);
// mark the old instance, ready to be deleted
if(!rrd_flag_check(ri_old, RRD_FLAG_OWN_LABELS))
@@ -966,7 +967,7 @@ static inline void rrdinstance_from_rrdset(RRDSET *st) {
/*
// trigger updates on the old context
- if(!dictionary_stats_entries(rc_old->rrdinstances) && !dictionary_stats_referenced_items(rc_old->rrdinstances)) {
+ if(!dictionary_entries(rc_old->rrdinstances) && !dictionary_stats_referenced_items(rc_old->rrdinstances)) {
rrdcontext_lock(rc_old);
rc_old->flags = ((rc_old->flags & RRD_FLAG_QUEUED)?RRD_FLAG_QUEUED:RRD_FLAG_NONE)|RRD_FLAG_DELETED|RRD_FLAG_UPDATED|RRD_FLAG_LIVE_RETENTION|RRD_FLAG_UPDATE_REASON_UNUSED|RRD_FLAG_UPDATE_REASON_ZERO_RETENTION;
rc_old->first_time_t = 0;
@@ -1102,9 +1103,8 @@ static void rrdcontext_freez(RRDCONTEXT *rc) {
string_freez(rc->family);
}
-static void rrdcontext_insert_callback(const char *id, void *value, void *data) {
- (void)id;
- RRDHOST *host = (RRDHOST *)data;
+static void rrdcontext_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *rrdhost) {
+ RRDHOST *host = (RRDHOST *)rrdhost;
RRDCONTEXT *rc = (RRDCONTEXT *)value;
rc->rrdhost = host;
@@ -1167,10 +1167,7 @@ static void rrdcontext_insert_callback(const char *id, void *value, void *data)
rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_NEW_OBJECT);
}
-static void rrdcontext_delete_callback(const char *id, void *value, void *data) {
- (void)id;
- RRDHOST *host = (RRDHOST *)data;
- (void)host;
+static void rrdcontext_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *rrdhost __maybe_unused) {
RRDCONTEXT *rc = (RRDCONTEXT *)value;
@@ -1179,18 +1176,14 @@ static void rrdcontext_delete_callback(const char *id, void *value, void *data)
rrdcontext_freez(rc);
}
-static void rrdcontext_conflict_callback(const char *id, void *oldv, void *newv, void *data) {
- (void)id;
- RRDHOST *host = (RRDHOST *)data;
- (void)host;
-
+static bool rrdcontext_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *oldv, void *newv, void *rrdhost __maybe_unused) {
RRDCONTEXT *rc = (RRDCONTEXT *)oldv;
RRDCONTEXT *rc_new = (RRDCONTEXT *)newv;
//current rc is not archived, new_rc is archived, dont merge
if (!rrd_flag_is_archived(rc) && rrd_flag_is_archived(rc_new)) {
rrdcontext_freez(rc_new);
- return;
+ return false;
}
rrdcontext_lock(rc);
@@ -1246,11 +1239,11 @@ static void rrdcontext_conflict_callback(const char *id, void *oldv, void *newv,
rrdcontext_freez(rc_new);
// the react callback will continue from here
+ return rrd_flag_is_updated(rc);
}
-static void rrdcontext_react_callback(const char *id __maybe_unused, void *value, void *data __maybe_unused) {
+static void rrdcontext_react_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *rrdhost __maybe_unused) {
RRDCONTEXT *rc = (RRDCONTEXT *)value;
-
rrdcontext_trigger_updates(rc, __FUNCTION__ );
}
@@ -1259,33 +1252,35 @@ static void rrdcontext_trigger_updates(RRDCONTEXT *rc, const char *function) {
rrdcontext_queue_for_post_processing(rc, function, rc->flags);
}
-static void rrdcontext_hub_queue_insert_callback(const char *name __maybe_unused, void *context, void *data __maybe_unused) {
+static void rrdcontext_hub_queue_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *nothing __maybe_unused) {
RRDCONTEXT *rc = context;
rrd_flag_set(rc, RRD_FLAG_QUEUED_FOR_HUB);
rc->queue.queued_ut = now_realtime_usec();
rc->queue.queued_flags = rrd_flags_get(rc);
}
-static void rrdcontext_hub_queue_delete_callback(const char *name __maybe_unused, void *context, void *data __maybe_unused) {
+static void rrdcontext_hub_queue_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *nothing __maybe_unused) {
RRDCONTEXT *rc = context;
rrd_flag_clear(rc, RRD_FLAG_QUEUED_FOR_HUB);
}
-static void rrdcontext_hub_queue_conflict_callback(const char *name __maybe_unused, void *context, void *new_context __maybe_unused, void *data __maybe_unused) {
+static bool rrdcontext_hub_queue_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *new_context __maybe_unused, void *nothing __maybe_unused) {
// context and new_context are the same
// we just need to update the timings
RRDCONTEXT *rc = context;
rrd_flag_set(rc, RRD_FLAG_QUEUED_FOR_HUB);
rc->queue.queued_ut = now_realtime_usec();
rc->queue.queued_flags |= rrd_flags_get(rc);
+
+ return true;
}
-static void rrdcontext_post_processing_queue_insert_callback(const char *name __maybe_unused, void *context, void *data __maybe_unused) {
+static void rrdcontext_post_processing_queue_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *nothing __maybe_unused) {
RRDCONTEXT *rc = context;
rrd_flag_set(rc, RRD_FLAG_QUEUED_FOR_POST_PROCESSING);
}
-static void rrdcontext_post_processing_queue_delete_callback(const char *name __maybe_unused, void *context, void *data __maybe_unused) {
+static void rrdcontext_post_processing_queue_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *nothing __maybe_unused) {
RRDCONTEXT *rc = context;
rrd_flag_clear(rc, RRD_FLAG_QUEUED_FOR_POST_PROCESSING);
}
@@ -1297,23 +1292,19 @@ void rrdhost_create_rrdcontexts(RRDHOST *host) {
if(unlikely(!host)) return;
if(likely(host->rrdctx)) return;
- host->rrdctx = (RRDCONTEXTS *)dictionary_create(DICTIONARY_FLAG_DONT_OVERWRITE_VALUE);
- dictionary_register_insert_callback((DICTIONARY *)host->rrdctx, rrdcontext_insert_callback, (void *)host);
- dictionary_register_delete_callback((DICTIONARY *)host->rrdctx, rrdcontext_delete_callback, (void *)host);
- dictionary_register_conflict_callback((DICTIONARY *)host->rrdctx, rrdcontext_conflict_callback, (void *)host);
- dictionary_register_react_callback((DICTIONARY *)host->rrdctx, rrdcontext_react_callback, (void *)host);
+ host->rrdctx = (RRDCONTEXTS *)dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE);
+ dictionary_register_insert_callback((DICTIONARY *)host->rrdctx, rrdcontext_insert_callback, host);
+ dictionary_register_delete_callback((DICTIONARY *)host->rrdctx, rrdcontext_delete_callback, host);
+ dictionary_register_conflict_callback((DICTIONARY *)host->rrdctx, rrdcontext_conflict_callback, host);
+ dictionary_register_react_callback((DICTIONARY *)host->rrdctx, rrdcontext_react_callback, host);
- host->rrdctx_hub_queue = (RRDCONTEXTS *)dictionary_create(
- DICTIONARY_FLAG_DONT_OVERWRITE_VALUE
- |DICTIONARY_FLAG_VALUE_LINK_DONT_CLONE);
+ host->rrdctx_hub_queue = (RRDCONTEXTS *)dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_VALUE_LINK_DONT_CLONE);
dictionary_register_insert_callback((DICTIONARY *)host->rrdctx_hub_queue, rrdcontext_hub_queue_insert_callback, NULL);
dictionary_register_delete_callback((DICTIONARY *)host->rrdctx_hub_queue, rrdcontext_hub_queue_delete_callback, NULL);
dictionary_register_conflict_callback((DICTIONARY *)host->rrdctx_hub_queue, rrdcontext_hub_queue_conflict_callback, NULL);
- host->rrdctx_post_processing_queue = (RRDCONTEXTS *)dictionary_create(
- DICTIONARY_FLAG_DONT_OVERWRITE_VALUE
- |DICTIONARY_FLAG_VALUE_LINK_DONT_CLONE);
+ host->rrdctx_post_processing_queue = (RRDCONTEXTS *)dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_VALUE_LINK_DONT_CLONE);
dictionary_register_insert_callback((DICTIONARY *)host->rrdctx_hub_queue, rrdcontext_post_processing_queue_insert_callback, NULL);
dictionary_register_delete_callback((DICTIONARY *)host->rrdctx_hub_queue, rrdcontext_post_processing_queue_delete_callback, NULL);
@@ -1331,7 +1322,7 @@ void rrdhost_destroy_rrdcontexts(RRDHOST *host) {
RRDCONTEXT *rc;
dfe_start_write(old, rc) {
- dictionary_del_having_write_lock(old, string2str(rc->id));
+ dictionary_del(old, string2str(rc->id));
}
dfe_done(rc);
dictionary_destroy(old);
@@ -1343,7 +1334,7 @@ void rrdhost_destroy_rrdcontexts(RRDHOST *host) {
RRDCONTEXT *rc;
dfe_start_write(old, rc) {
- dictionary_del_having_write_lock(old, string2str(rc->id));
+ dictionary_del(old, string2str(rc->id));
}
dfe_done(rc);
dictionary_destroy(old);
@@ -1464,6 +1455,35 @@ void rrdcontext_db_rotation(void) {
rrdcontext_next_db_rotation_ut = now_realtime_usec() + FULL_RETENTION_SCAN_DELAY_AFTER_DB_ROTATION_SECS * USEC_PER_SEC;
}
+int rrdcontext_foreach_instance_with_rrdset_in_context(RRDHOST *host, const char *context, int (*callback)(RRDSET *st, void *data), void *data) {
+ if(unlikely(!host || !context || !*context || !callback))
+ return -1;
+
+ RRDCONTEXT_ACQUIRED *rca = (RRDCONTEXT_ACQUIRED *)dictionary_get_and_acquire_item((DICTIONARY *)host->rrdctx, context);
+ if(unlikely(!rca)) return -1;
+
+ RRDCONTEXT *rc = rrdcontext_acquired_value(rca);
+ if(unlikely(!rc)) return -1;
+
+ int ret = 0;
+ RRDINSTANCE *ri;
+ dfe_start_read(rc->rrdinstances, ri) {
+ if(ri->rrdset) {
+ int r = callback(ri->rrdset, data);
+ if(r >= 0) ret += r;
+ else {
+ ret = r;
+ break;
+ }
+ }
+ }
+ dfe_done(ri);
+
+ rrdcontext_release(rca);
+
+ return ret;
+}
+
// ----------------------------------------------------------------------------
// ACLK interface
@@ -1601,7 +1621,8 @@ struct rrdcontext_to_json {
RRD_FLAGS combined_flags;
};
-static inline int rrdmetric_to_json_callback(const char *id, void *value, void *data) {
+static inline int rrdmetric_to_json_callback(const DICTIONARY_ITEM *item, void *value, void *data) {
+ const char *id = dictionary_acquired_item_name(item);
struct rrdcontext_to_json * t = data;
RRDMETRIC *rm = value;
BUFFER *wb = t->wb;
@@ -1673,7 +1694,9 @@ static inline int rrdmetric_to_json_callback(const char *id, void *value, void *
return 1;
}
-static inline int rrdinstance_to_json_callback(const char *id, void *value, void *data) {
+static inline int rrdinstance_to_json_callback(const DICTIONARY_ITEM *item, void *value, void *data) {
+ const char *id = dictionary_acquired_item_name(item);
+
struct rrdcontext_to_json *t_parent = data;
RRDINSTANCE *ri = value;
BUFFER *wb = t_parent->wb;
@@ -1788,7 +1811,7 @@ static inline int rrdinstance_to_json_callback(const char *id, void *value, void
buffer_strcat(wb, "\"");
}
- if(options & RRDCONTEXT_OPTION_SHOW_LABELS && ri->rrdlabels && dictionary_stats_entries(ri->rrdlabels)) {
+ if(options & RRDCONTEXT_OPTION_SHOW_LABELS && ri->rrdlabels && dictionary_entries(ri->rrdlabels)) {
buffer_sprintf(wb, ",\n\t\t\t\t\t\"labels\": {\n");
rrdlabels_to_buffer(ri->rrdlabels, wb, "\t\t\t\t\t\t", ":", "\"", ",\n", NULL, NULL, NULL, NULL);
buffer_strcat(wb, "\n\t\t\t\t\t}");
@@ -1807,7 +1830,8 @@ static inline int rrdinstance_to_json_callback(const char *id, void *value, void
return 1;
}
-static inline int rrdcontext_to_json_callback(const char *id, void *value, void *data) {
+static inline int rrdcontext_to_json_callback(const DICTIONARY_ITEM *item, void *value, void *data) {
+ const char *id = dictionary_acquired_item_name(item);
struct rrdcontext_to_json *t_parent = data;
RRDCONTEXT *rc = value;
BUFFER *wb = t_parent->wb;
@@ -1974,7 +1998,7 @@ int rrdcontext_to_json(RRDHOST *host, BUFFER *wb, time_t after, time_t before, R
.written = 0,
.now = now_realtime_sec(),
};
- rrdcontext_to_json_callback(context, rc, &t_contexts);
+ rrdcontext_to_json_callback((DICTIONARY_ITEM *)rca, rc, &t_contexts);
rrdcontext_release(rca);
@@ -2300,10 +2324,10 @@ static inline bool rrdinstance_should_be_deleted(RRDINSTANCE *ri) {
if(likely(ri->rrdset))
return false;
- if(unlikely(dictionary_stats_referenced_items(ri->rrdmetrics) != 0))
+ if(unlikely(dictionary_referenced_items(ri->rrdmetrics) != 0))
return false;
- if(unlikely(dictionary_stats_entries(ri->rrdmetrics) != 0))
+ if(unlikely(dictionary_entries(ri->rrdmetrics) != 0))
return false;
if(ri->first_time_t || ri->last_time_t)
@@ -2319,10 +2343,10 @@ static inline bool rrdcontext_should_be_deleted(RRDCONTEXT *rc) {
if(likely(rrd_flag_check(rc, RRD_FLAGS_PREVENTING_DELETIONS)))
return false;
- if(unlikely(dictionary_stats_referenced_items(rc->rrdinstances) != 0))
+ if(unlikely(dictionary_referenced_items(rc->rrdinstances) != 0))
return false;
- if(unlikely(dictionary_stats_entries(rc->rrdinstances) != 0))
+ if(unlikely(dictionary_entries(rc->rrdinstances) != 0))
return false;
if(unlikely(rc->first_time_t || rc->last_time_t))
@@ -2364,7 +2388,7 @@ static void rrdcontext_garbage_collect_single_host(RRDHOST *host, bool worker_jo
dfe_start_write(ri->rrdmetrics, rm) {
if(rrdmetric_should_be_deleted(rm)) {
if(worker_jobs) worker_is_busy(WORKER_JOB_CLEANUP_DELETE);
- if(dictionary_del_having_write_lock(ri->rrdmetrics, string2str(rm->id)) != 0)
+ if(!dictionary_del(ri->rrdmetrics, string2str(rm->id)))
error("RRDCONTEXT: metric '%s' of instance '%s' of context '%s' of host '%s', failed to be deleted from rrdmetrics dictionary.",
string2str(rm->id),
string2str(ri->id),
@@ -2384,7 +2408,7 @@ static void rrdcontext_garbage_collect_single_host(RRDHOST *host, bool worker_jo
if(rrdinstance_should_be_deleted(ri)) {
if(worker_jobs) worker_is_busy(WORKER_JOB_CLEANUP_DELETE);
- if(dictionary_del(rc->rrdinstances, string2str(ri->id)) != 0)
+ if(!dictionary_del(rc->rrdinstances, string2str(ri->id)))
error("RRDCONTEXT: instance '%s' of context '%s' of host '%s', failed to be deleted from rrdmetrics dictionary.",
string2str(ri->id),
string2str(rc->id),
@@ -2405,7 +2429,7 @@ static void rrdcontext_garbage_collect_single_host(RRDHOST *host, bool worker_jo
rrdcontext_dequeue_from_post_processing(rc);
rrdcontext_delete_from_sql_unsafe(rc);
- if(dictionary_del((DICTIONARY *)host->rrdctx, string2str(rc->id)) != 0)
+ if(!dictionary_del((DICTIONARY *)host->rrdctx, string2str(rc->id)))
error("RRDCONTEXT: context '%s' of host '%s', failed to be deleted from rrdmetrics dictionary.",
string2str(rc->id),
rrdhost_hostname(host));
@@ -2471,7 +2495,7 @@ static void rrdinstance_post_process_updates(RRDINSTANCE *ri, bool force, RRD_FL
time_t min_first_time_t = LONG_MAX, max_last_time_t = 0;
size_t metrics_active = 0, metrics_deleted = 0;
bool live_retention = true, currently_collected = false;
- if(dictionary_stats_entries(ri->rrdmetrics) > 0) {
+ if(dictionary_entries(ri->rrdmetrics) > 0) {
RRDMETRIC *rm;
dfe_start_read((DICTIONARY *)ri->rrdmetrics, rm) {
if(unlikely(netdata_exit)) break;
@@ -2574,7 +2598,7 @@ static void rrdcontext_post_process_updates(RRDCONTEXT *rc, bool force, RRD_FLAG
time_t min_first_time_t = LONG_MAX, max_last_time_t = 0;
size_t instances_active = 0, instances_deleted = 0;
bool live_retention = true, currently_collected = false, hidden = true;
- if(dictionary_stats_entries(rc->rrdinstances) > 0) {
+ if(dictionary_entries(rc->rrdinstances) > 0) {
RRDINSTANCE *ri;
dfe_start_reentrant(rc->rrdinstances, ri) {
if(unlikely(netdata_exit)) break;
@@ -2714,7 +2738,7 @@ static void rrdcontext_queue_for_post_processing(RRDCONTEXT *rc, const char *fun
rc,
sizeof(*rc));
-#ifdef NETDATA_INTERNAL_CHECKS
+#if(defined(NETDATA_INTERNAL_CHECKS) && defined(LOG_POST_PROCESSING_QUEUE_INSERTIONS))
{
BUFFER *wb_flags = buffer_create(1000);
rrd_flags_to_buffer(flags, wb_flags);
@@ -2912,7 +2936,7 @@ static void rrdcontext_dispatch_queued_contexts_to_hub(RRDHOST *host, usec_t now
return;
// check if there are queued items to send
- if(!dictionary_stats_entries((DICTIONARY *)host->rrdctx_hub_queue))
+ if(!dictionary_entries((DICTIONARY *)host->rrdctx_hub_queue))
return;
if(!host->node_id)
@@ -2975,7 +2999,7 @@ static void rrdcontext_dispatch_queued_contexts_to_hub(RRDHOST *host, usec_t now
rrdcontext_unlock(rc);
// delete it from the master dictionary
- if(dictionary_del((DICTIONARY *)host->rrdctx, string2str(rc->id)) != 0)
+ if(!dictionary_del((DICTIONARY *)host->rrdctx, string2str(rc->id)))
error("RRDCONTEXT: '%s' of host '%s' failed to be deleted from rrdcontext dictionary.",
string2str(id), rrdhost_hostname(host));
@@ -3010,7 +3034,10 @@ static void rrdcontext_dispatch_queued_contexts_to_hub(RRDHOST *host, usec_t now
static void rrdcontext_main_cleanup(void *ptr) {
struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
+
// custom code
+ worker_unregister();
+
static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
}
@@ -3065,12 +3092,13 @@ void *rrdcontext_main(void *ptr) {
worker_is_busy(WORKER_JOB_HOSTS);
if(host->rrdctx_post_processing_queue) {
- pp_queued_contexts_for_all_hosts += dictionary_stats_entries((DICTIONARY *)host->rrdctx_post_processing_queue);
+ pp_queued_contexts_for_all_hosts +=
+ dictionary_entries((DICTIONARY *)host->rrdctx_post_processing_queue);
rrdcontext_post_process_queued_contexts(host);
}
if(host->rrdctx_hub_queue) {
- hub_queued_contexts_for_all_hosts += dictionary_stats_entries((DICTIONARY *)host->rrdctx_hub_queue);
+ hub_queued_contexts_for_all_hosts += dictionary_entries((DICTIONARY *)host->rrdctx_hub_queue);
rrdcontext_dispatch_queued_contexts_to_hub(host, now_ut);
}
}