summaryrefslogtreecommitdiffstats
path: root/database
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-10-11 18:01:46 +0300
committerGitHub <noreply@github.com>2022-10-11 18:01:46 +0300
commitec95f306f35cff802a7f08bad43411945c0ba977 (patch)
tree1c31bbd518b79c693912f5625210d38148399327 /database
parent849e34209208af2ad7939f221872a49e20321883 (diff)
fix post-processing of contexts (#13807)
Diffstat (limited to 'database')
-rw-r--r--database/rrdcontext.c68
1 files changed, 56 insertions, 12 deletions
diff --git a/database/rrdcontext.c b/database/rrdcontext.c
index 20f20597e6..2cb8afb392 100644
--- a/database/rrdcontext.c
+++ b/database/rrdcontext.c
@@ -155,6 +155,7 @@ rrd_flag_add_remove_atomic(RRD_FLAGS *flags, RRD_FLAGS check, RRD_FLAGS conditio
| RRD_FLAG_DELETED \
| RRD_FLAG_UPDATE_REASON_STOPPED_BEING_COLLECTED \
| RRD_FLAG_UPDATE_REASON_ZERO_RETENTION \
+ | RRD_FLAG_UPDATE_REASON_DISCONNECTED_CHILD \
)
#define rrd_flag_set_archived(obj) \
@@ -272,9 +273,9 @@ typedef struct rrdinstance {
DICTIONARY *rrdmetrics;
struct {
- uint32_t collected_metrics; // a temporary variable to detect BEGIN/END without SET
- // don't use it for other purposes
- // it goes up and then resets to zero, on every iteration
+ uint32_t collected_metrics_count; // a temporary variable to detect BEGIN/END without SET
+ // don't use it for other purposes
+ // it goes up and then resets to zero, on every iteration
} internal;
} RRDINSTANCE;
@@ -298,11 +299,19 @@ typedef struct rrdcontext {
RRDHOST *rrdhost;
struct {
+ RRD_FLAGS queued_flags; // the last flags that triggered the post-processing
+ usec_t queued_ut; // the last time this was queued
+ usec_t dequeued_ut; // the last time we sent (or deduplicated) this context
+ size_t executions; // how many times this context has been processed
+ } pp;
+
+ struct {
RRD_FLAGS queued_flags; // the last flags that triggered the queueing
usec_t queued_ut; // the last time this was queued
usec_t delay_calc_ut; // the last time we calculated the scheduled_dispatched_ut
usec_t scheduled_dispatch_ut; // the time it was/is scheduled to be sent
- usec_t dequeued_ut; // the last time we sent (or deduped) this context
+ usec_t dequeued_ut; // the last time we sent (or deduplicated) this context
+ size_t dispatches; // the number of times this has been dispatched to hub
} queue;
netdata_mutex_t mutex;
@@ -639,7 +648,7 @@ static inline void rrdmetric_collected_rrddim(RRDDIM *rd) {
rrd_flag_set_collected(rm);
// we use this variable to detect BEGIN/END without SET
- rm->ri->internal.collected_metrics++;
+ rm->ri->internal.collected_metrics_count++;
rrdmetric_trigger_updates(rm, __FUNCTION__ );
}
@@ -1079,11 +1088,11 @@ static inline void rrdinstance_collected_rrdset(RRDSET *st) {
rrdinstance_updated_rrdset_flags_no_action(ri, st);
- if(unlikely(ri->internal.collected_metrics && !rrd_flag_is_collected(ri)))
+ if(unlikely(ri->internal.collected_metrics_count && !rrd_flag_is_collected(ri)))
rrd_flag_set_collected(ri);
// we use this variable to detect BEGIN/END without SET
- ri->internal.collected_metrics = 0;
+ ri->internal.collected_metrics_count = 0;
rrdinstance_trigger_updates(ri, __FUNCTION__ );
}
@@ -1273,11 +1282,31 @@ static bool rrdcontext_hub_queue_conflict_callback(const DICTIONARY_ITEM *item _
static void rrdcontext_post_processing_queue_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *nothing __maybe_unused) {
RRDCONTEXT *rc = context;
rrd_flag_set(rc, RRD_FLAG_QUEUED_FOR_POST_PROCESSING);
+ rc->pp.queued_flags = rc->flags;
+ rc->pp.queued_ut = now_realtime_usec();
}
static void rrdcontext_post_processing_queue_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *nothing __maybe_unused) {
RRDCONTEXT *rc = context;
rrd_flag_clear(rc, RRD_FLAG_QUEUED_FOR_POST_PROCESSING);
+ rc->pp.dequeued_ut = now_realtime_usec();
+}
+
+static bool rrdcontext_post_processing_queue_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *new_context __maybe_unused, void *nothing __maybe_unused) {
+ RRDCONTEXT *rc = context;
+ bool changed = false;
+
+ if(!(rc->flags & RRD_FLAG_QUEUED_FOR_POST_PROCESSING)) {
+ rrd_flag_set(rc, RRD_FLAG_QUEUED_FOR_POST_PROCESSING);
+ changed = true;
+ }
+
+ if(rc->pp.queued_flags != rc->flags) {
+ rc->pp.queued_flags |= rc->flags;
+ changed = true;
+ }
+
+ return changed;
}
void rrdhost_create_rrdcontexts(RRDHOST *host) {
@@ -1291,15 +1320,14 @@ void rrdhost_create_rrdcontexts(RRDHOST *host) {
dictionary_register_react_callback((DICTIONARY *)host->rrdctx, rrdcontext_react_callback, host);
host->rrdctx_hub_queue = (RRDCONTEXTS *)dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_VALUE_LINK_DONT_CLONE);
-
dictionary_register_insert_callback((DICTIONARY *)host->rrdctx_hub_queue, rrdcontext_hub_queue_insert_callback, NULL);
dictionary_register_delete_callback((DICTIONARY *)host->rrdctx_hub_queue, rrdcontext_hub_queue_delete_callback, NULL);
dictionary_register_conflict_callback((DICTIONARY *)host->rrdctx_hub_queue, rrdcontext_hub_queue_conflict_callback, NULL);
host->rrdctx_post_processing_queue = (RRDCONTEXTS *)dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_VALUE_LINK_DONT_CLONE);
-
- dictionary_register_insert_callback((DICTIONARY *)host->rrdctx_hub_queue, rrdcontext_post_processing_queue_insert_callback, NULL);
- dictionary_register_delete_callback((DICTIONARY *)host->rrdctx_hub_queue, rrdcontext_post_processing_queue_delete_callback, NULL);
+ dictionary_register_insert_callback((DICTIONARY *)host->rrdctx_post_processing_queue, rrdcontext_post_processing_queue_insert_callback, NULL);
+ dictionary_register_delete_callback((DICTIONARY *)host->rrdctx_post_processing_queue, rrdcontext_post_processing_queue_delete_callback, NULL);
+ dictionary_register_conflict_callback((DICTIONARY *)host->rrdctx_post_processing_queue, rrdcontext_post_processing_queue_conflict_callback, NULL);
}
void rrdhost_destroy_rrdcontexts(RRDHOST *host) {
@@ -1396,7 +1424,6 @@ void rrdcontext_host_child_connected(RRDHOST *host) {
}
void rrdcontext_host_child_disconnected(RRDHOST *host) {
-
rrdcontext_recalculate_host_retention(host, RRD_FLAG_UPDATE_REASON_DISCONNECTED_CHILD, false);
}
@@ -1894,14 +1921,29 @@ static inline int rrdcontext_to_json_callback(const DICTIONARY_ITEM *item, void
",\n\t\t\t\"last_queued\":%llu"
",\n\t\t\t\"scheduled_dispatch\":%llu"
",\n\t\t\t\"last_dequeued\":%llu"
+ ",\n\t\t\t\"dispatches\":%zu"
",\n\t\t\t\"hub_version\":%"PRIu64""
",\n\t\t\t\"version\":%"PRIu64""
, rc->queue.queued_ut / USEC_PER_SEC
, rc->queue.scheduled_dispatch_ut / USEC_PER_SEC
, rc->queue.dequeued_ut / USEC_PER_SEC
+ , rc->queue.dispatches
, rc->hub.version
, rc->version
);
+
+ buffer_strcat(wb, ",\n\t\t\t\"pp_reasons\":\"");
+ rrd_reasons_to_buffer(rc->pp.queued_flags, wb);
+ buffer_strcat(wb, "\"");
+
+ buffer_sprintf(wb,
+ ",\n\t\t\t\"pp_last_queued\":%llu"
+ ",\n\t\t\t\"pp_last_dequeued\":%llu"
+ ",\n\t\t\t\"pp_executed\":%zu"
+ , rc->pp.queued_ut / USEC_PER_SEC
+ , rc->pp.dequeued_ut / USEC_PER_SEC
+ , rc->pp.executions
+ );
}
rrdcontext_unlock(rc);
@@ -2609,6 +2651,7 @@ static void rrdcontext_post_process_updates(RRDCONTEXT *rc, bool force, RRD_FLAG
}
rrdcontext_lock(rc);
+ rc->pp.executions++;
if(unlikely(!instances_active)) {
// we had some instances, but they are gone now...
@@ -2931,6 +2974,7 @@ static void rrdcontext_dispatch_queued_contexts_to_hub(RRDHOST *host, usec_t now
rrdcontext_message_send_unsafe(rc, false, bundle);
messages_added++;
+ rc->queue.dispatches++;
rc->queue.dequeued_ut = now_ut;
}
else