summaryrefslogtreecommitdiffstats
path: root/database/contexts/worker.c
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-03-02 22:50:48 +0200
committerGitHub <noreply@github.com>2023-03-02 22:50:48 +0200
commit021e252fc5d18a7225c0f4c975b3281016861d3c (patch)
tree63f92adc27419ca9df464635cd85424f52c94179 /database/contexts/worker.c
parentc4d8d35b9f065f2a847f2780acb4342dabdfd34c (diff)
/api/v2/contexts (#14592)
* preparation for /api/v2/contexts * working /api/v2/contexts * add anomaly rate information in all statistics; when sum-count is requested, return sums and counts instead of averages * minor fix * query targegt now accurately counts hosts, contexts, instances, dimensions, metrics * cleanup /api/v2/contexts * full text search with /api/v2/contexts * simple patterns now support the option to search ignoring case * full text search API with /api/v2/q * simple pattern execution optimization * do not show q when not given * full text search accounting * separated /api/v2/nodes from /api/v2/contexts * fix ssv queries for group_by * count query instances queried and failed per context and host * split rrdcontext.c to multiple files * add query totals * fix anomaly rate calculation; provide "ni" for indexing hosts * do not generate zero valued members * faster calculation of anomaly rate; by just summing integers for each db points and doing math once for every generated point * fix typo when printing dimensions totals * added option minify to remove spaces and newlines fron JSON output * send instance ids and names when they differ * do not add in query target dimensions, instances, contexts and hosts for which there is no retention in the current timeframe * fix for the previous + renames and code cleanup * when a dimension is filtered, include in the response all the other dimensions that are selectable * do not add nodes that do not have retention in the current window * move selection of dimensions to query_dimension_add(), instead of query_metric_add() * increase the pre-processing capacity of queries * generate instance fqdn ids and names only when they are needed * provide detailed statistics about tiers retention, queries, points, update_every * late allocation of query dimensions * cleanup * more cleanup * support for annotations per displayed point, RESET and PARTIAL * new type annotations * if a chart is not linked to contexts and it is collected, link it when it is collected * make ML run reentrant * make ML rrdr query synchronous * optimize replication memory allocation of replication_sort_entry * change units to percentage, when requesting a coefficinet of variation, or a percentage query * initialize replication before starting main threads * properly decrement no room requests counter * propagate the non-zero flag to group-by * the same by avoiding the extra loop * respect non-zero in all dimension arrays * remove dictionary garbage collection from dictionary_entries() and dictionary_version() * be more verbose when jv2 indexing is postponed * prevent infinite loop * use hidden dimensions even when dimensions pattern is unset * traverse hosts using dictionaries * fix dictionary unittests
Diffstat (limited to 'database/contexts/worker.c')
-rw-r--r--database/contexts/worker.c1090
1 files changed, 1090 insertions, 0 deletions
diff --git a/database/contexts/worker.c b/database/contexts/worker.c
new file mode 100644
index 0000000000..b9f545c692
--- /dev/null
+++ b/database/contexts/worker.c
@@ -0,0 +1,1090 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "internal.h"
+
+static uint64_t rrdcontext_get_next_version(RRDCONTEXT *rc);
+
+static bool check_if_cloud_version_changed_unsafe(RRDCONTEXT *rc, bool sending __maybe_unused);
+
+static void rrdcontext_delete_from_sql_unsafe(RRDCONTEXT *rc);
+
+static void rrdcontext_dequeue_from_post_processing(RRDCONTEXT *rc);
+static void rrdcontext_post_process_updates(RRDCONTEXT *rc, bool force, RRD_FLAGS reason, bool worker_jobs);
+
+static void rrdcontext_garbage_collect_single_host(RRDHOST *host, bool worker_jobs);
+static void rrdcontext_garbage_collect_for_all_hosts(void);
+
+extern usec_t rrdcontext_next_db_rotation_ut;
+
+// ----------------------------------------------------------------------------
+// load from SQL
+
+static void rrdinstance_load_clabel(SQL_CLABEL_DATA *sld, void *data) {
+ RRDINSTANCE *ri = data;
+ rrdlabels_add(ri->rrdlabels, sld->label_key, sld->label_value, sld->label_source);
+}
+
+static void rrdinstance_load_dimension(SQL_DIMENSION_DATA *sd, void *data) {
+ RRDINSTANCE *ri = data;
+
+ RRDMETRIC trm = {
+ .id = string_strdupz(sd->id),
+ .name = string_strdupz(sd->name),
+ .flags = RRD_FLAG_ARCHIVED | RRD_FLAG_UPDATE_REASON_LOAD_SQL, // no need for atomic
+ };
+ if(sd->hidden) trm.flags |= RRD_FLAG_HIDDEN;
+
+ uuid_copy(trm.uuid, sd->dim_id);
+
+ dictionary_set(ri->rrdmetrics, string2str(trm.id), &trm, sizeof(trm));
+}
+
+static void rrdinstance_load_chart_callback(SQL_CHART_DATA *sc, void *data) {
+ RRDHOST *host = data;
+
+ RRDCONTEXT tc = {
+ .id = string_strdupz(sc->context),
+ .title = string_strdupz(sc->title),
+ .units = string_strdupz(sc->units),
+ .family = string_strdupz(sc->family),
+ .priority = sc->priority,
+ .chart_type = sc->chart_type,
+ .flags = RRD_FLAG_ARCHIVED | RRD_FLAG_UPDATE_REASON_LOAD_SQL, // no need for atomics
+ .rrdhost = host,
+ };
+
+ RRDCONTEXT_ACQUIRED *rca = (RRDCONTEXT_ACQUIRED *)dictionary_set_and_acquire_item(host->rrdctx.contexts, string2str(tc.id), &tc, sizeof(tc));
+ RRDCONTEXT *rc = rrdcontext_acquired_value(rca);
+
+ RRDINSTANCE tri = {
+ .id = string_strdupz(sc->id),
+ .name = string_strdupz(sc->name),
+ .title = string_strdupz(sc->title),
+ .units = string_strdupz(sc->units),
+ .family = string_strdupz(sc->family),
+ .chart_type = sc->chart_type,
+ .priority = sc->priority,
+ .update_every_s = sc->update_every,
+ .flags = RRD_FLAG_ARCHIVED | RRD_FLAG_UPDATE_REASON_LOAD_SQL, // no need for atomics
+ };
+ uuid_copy(tri.uuid, sc->chart_id);
+
+ RRDINSTANCE_ACQUIRED *ria = (RRDINSTANCE_ACQUIRED *)dictionary_set_and_acquire_item(rc->rrdinstances, sc->id, &tri, sizeof(tri));
+ RRDINSTANCE *ri = rrdinstance_acquired_value(ria);
+
+ ctx_get_dimension_list(&ri->uuid, rrdinstance_load_dimension, ri);
+ ctx_get_label_list(&ri->uuid, rrdinstance_load_clabel, ri);
+ rrdinstance_trigger_updates(ri, __FUNCTION__ );
+ rrdinstance_release(ria);
+ rrdcontext_release(rca);
+}
+
+static void rrdcontext_load_context_callback(VERSIONED_CONTEXT_DATA *ctx_data, void *data) {
+ RRDHOST *host = data;
+ (void)host;
+
+ RRDCONTEXT trc = {
+ .id = string_strdupz(ctx_data->id),
+ .flags = RRD_FLAG_ARCHIVED | RRD_FLAG_UPDATE_REASON_LOAD_SQL, // no need for atomics
+
+ // no need to set more data here
+ // we only need the hub data
+
+ .hub = *ctx_data,
+ };
+ dictionary_set(host->rrdctx.contexts, string2str(trc.id), &trc, sizeof(trc));
+}
+
+void rrdhost_load_rrdcontext_data(RRDHOST *host) {
+ if(host->rrdctx.contexts) return;
+
+ rrdhost_create_rrdcontexts(host);
+ ctx_get_context_list(&host->host_uuid, rrdcontext_load_context_callback, host);
+ ctx_get_chart_list(&host->host_uuid, rrdinstance_load_chart_callback, host);
+
+ RRDCONTEXT *rc;
+ dfe_start_read(host->rrdctx.contexts, rc) {
+ rrdcontext_trigger_updates(rc, __FUNCTION__ );
+ }
+ dfe_done(rc);
+
+ rrdcontext_garbage_collect_single_host(host, false);
+}
+
+// ----------------------------------------------------------------------------
+// version hash calculation
+
+uint64_t rrdcontext_version_hash_with_callback(
+ RRDHOST *host,
+ void (*callback)(RRDCONTEXT *, bool, void *),
+ bool snapshot,
+ void *bundle) {
+
+ if(unlikely(!host || !host->rrdctx.contexts)) return 0;
+
+ RRDCONTEXT *rc;
+ uint64_t hash = 0;
+
+ // loop through all contexts of the host
+ dfe_start_read(host->rrdctx.contexts, rc) {
+
+ rrdcontext_lock(rc);
+
+ if(unlikely(rrd_flag_check(rc, RRD_FLAG_HIDDEN))) {
+ rrdcontext_unlock(rc);
+ continue;
+ }
+
+ if(unlikely(callback))
+ callback(rc, snapshot, bundle);
+
+ // skip any deleted contexts
+ if(unlikely(rrd_flag_is_deleted(rc))) {
+ rrdcontext_unlock(rc);
+ continue;
+ }
+
+ // we use rc->hub.* which has the latest
+ // metadata we have sent to the hub
+
+ // if a context is currently queued, rc->hub.* does NOT
+ // reflect the queued changes. rc->hub.* is updated with
+ // their metadata, after messages are dispatched to hub.
+
+ // when the context is being collected,
+ // rc->hub.last_time_t is already zero
+
+ hash += rc->hub.version + rc->hub.last_time_s - rc->hub.first_time_s;
+
+ rrdcontext_unlock(rc);
+
+ }
+ dfe_done(rc);
+
+ return hash;
+}
+
+// ----------------------------------------------------------------------------
+// retention recalculation
+
+void rrdcontext_recalculate_context_retention(RRDCONTEXT *rc, RRD_FLAGS reason, bool worker_jobs) {
+ rrdcontext_post_process_updates(rc, true, reason, worker_jobs);
+}
+
+void rrdcontext_recalculate_host_retention(RRDHOST *host, RRD_FLAGS reason, bool worker_jobs) {
+ if(unlikely(!host || !host->rrdctx.contexts)) return;
+
+ RRDCONTEXT *rc;
+ dfe_start_read(host->rrdctx.contexts, rc) {
+ rrdcontext_recalculate_context_retention(rc, reason, worker_jobs);
+ }
+ dfe_done(rc);
+}
+
+static void rrdcontext_recalculate_retention_all_hosts(void) {
+ rrdcontext_next_db_rotation_ut = 0;
+ RRDHOST *host;
+ dfe_start_reentrant(rrdhost_root_index, host) {
+ worker_is_busy(WORKER_JOB_RETENTION);
+ rrdcontext_recalculate_host_retention(host, RRD_FLAG_UPDATE_REASON_DB_ROTATION, true);
+ }
+ dfe_done(host);
+}
+
+// ----------------------------------------------------------------------------
+// garbage collector
+
+bool rrdmetric_update_retention(RRDMETRIC *rm) {
+ time_t min_first_time_t = LONG_MAX, max_last_time_t = 0;
+
+ if(rm->rrddim) {
+ min_first_time_t = rrddim_first_entry_s(rm->rrddim);
+ max_last_time_t = rrddim_last_entry_s(rm->rrddim);
+ }
+ else {
+ RRDHOST *rrdhost = rm->ri->rc->rrdhost;
+ for (size_t tier = 0; tier < storage_tiers; tier++) {
+ STORAGE_ENGINE *eng = rrdhost->db[tier].eng;
+
+ time_t first_time_t, last_time_t;
+ if (eng->api.metric_retention_by_uuid(rrdhost->db[tier].instance, &rm->uuid, &first_time_t, &last_time_t)) {
+ if (first_time_t < min_first_time_t)
+ min_first_time_t = first_time_t;
+
+ if (last_time_t > max_last_time_t)
+ max_last_time_t = last_time_t;
+ }
+ }
+ }
+
+ if((min_first_time_t == LONG_MAX || min_first_time_t == 0) && max_last_time_t == 0)
+ return false;
+
+ if(min_first_time_t == LONG_MAX)
+ min_first_time_t = 0;
+
+ if(min_first_time_t > max_last_time_t) {
+ internal_error(true, "RRDMETRIC: retention of '%s' is flipped, first_time_t = %ld, last_time_t = %ld", string2str(rm->id), min_first_time_t, max_last_time_t);
+ time_t tmp = min_first_time_t;
+ min_first_time_t = max_last_time_t;
+ max_last_time_t = tmp;
+ }
+
+ // check if retention changed
+
+ if (min_first_time_t != rm->first_time_s) {
+ rm->first_time_s = min_first_time_t;
+ rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T);
+ }
+
+ if (max_last_time_t != rm->last_time_s) {
+ rm->last_time_s = max_last_time_t;
+ rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T);
+ }
+
+ if(unlikely(!rm->first_time_s && !rm->last_time_s))
+ rrd_flag_set_deleted(rm, RRD_FLAG_UPDATE_REASON_ZERO_RETENTION);
+
+ rrd_flag_set(rm, RRD_FLAG_LIVE_RETENTION);
+
+ return true;
+}
+
+static inline bool rrdmetric_should_be_deleted(RRDMETRIC *rm) {
+ if(likely(!rrd_flag_check(rm, RRD_FLAGS_REQUIRED_FOR_DELETIONS)))
+ return false;
+
+ if(likely(rrd_flag_check(rm, RRD_FLAGS_PREVENTING_DELETIONS)))
+ return false;
+
+ if(likely(rm->rrddim))
+ return false;
+
+ rrdmetric_update_retention(rm);
+ if(rm->first_time_s || rm->last_time_s)
+ return false;
+
+ return true;
+}
+
+static inline bool rrdinstance_should_be_deleted(RRDINSTANCE *ri) {
+ if(likely(!rrd_flag_check(ri, RRD_FLAGS_REQUIRED_FOR_DELETIONS)))
+ return false;
+
+ if(likely(rrd_flag_check(ri, RRD_FLAGS_PREVENTING_DELETIONS)))
+ return false;
+
+ if(likely(ri->rrdset))
+ return false;
+
+ if(unlikely(dictionary_referenced_items(ri->rrdmetrics) != 0))
+ return false;
+
+ if(unlikely(dictionary_entries(ri->rrdmetrics) != 0))
+ return false;
+
+ if(ri->first_time_s || ri->last_time_s)
+ return false;
+
+ return true;
+}
+
+static inline bool rrdcontext_should_be_deleted(RRDCONTEXT *rc) {
+ if(likely(!rrd_flag_check(rc, RRD_FLAGS_REQUIRED_FOR_DELETIONS)))
+ return false;
+
+ if(likely(rrd_flag_check(rc, RRD_FLAGS_PREVENTING_DELETIONS)))
+ return false;
+
+ if(unlikely(dictionary_referenced_items(rc->rrdinstances) != 0))
+ return false;
+
+ if(unlikely(dictionary_entries(rc->rrdinstances) != 0))
+ return false;
+
+ if(unlikely(rc->first_time_s || rc->last_time_s))
+ return false;
+
+ return true;
+}
+
+void rrdcontext_delete_from_sql_unsafe(RRDCONTEXT *rc) {
+ // we need to refresh the string pointers in rc->hub
+ // in case the context changed values
+ rc->hub.id = string2str(rc->id);
+ rc->hub.title = string2str(rc->title);
+ rc->hub.units = string2str(rc->units);
+ rc->hub.family = string2str(rc->family);
+
+ // delete it from SQL
+ if(ctx_delete_context(&rc->rrdhost->host_uuid, &rc->hub) != 0)
+ error("RRDCONTEXT: failed to delete context '%s' version %"PRIu64" from SQL.", rc->hub.id, rc->hub.version);
+}
+
+static void rrdcontext_garbage_collect_single_host(RRDHOST *host, bool worker_jobs) {
+
+ internal_error(true, "RRDCONTEXT: garbage collecting context structures of host '%s'", rrdhost_hostname(host));
+
+ RRDCONTEXT *rc;
+ dfe_start_reentrant(host->rrdctx.contexts, rc) {
+ if(unlikely(!service_running(SERVICE_CONTEXT))) break;
+
+ if(worker_jobs) worker_is_busy(WORKER_JOB_CLEANUP);
+
+ rrdcontext_lock(rc);
+
+ RRDINSTANCE *ri;
+ dfe_start_reentrant(rc->rrdinstances, ri) {
+ if(unlikely(!service_running(SERVICE_CONTEXT))) break;
+
+ RRDMETRIC *rm;
+ dfe_start_write(ri->rrdmetrics, rm) {
+ if(rrdmetric_should_be_deleted(rm)) {
+ if(worker_jobs) worker_is_busy(WORKER_JOB_CLEANUP_DELETE);
+ if(!dictionary_del(ri->rrdmetrics, string2str(rm->id)))
+ error("RRDCONTEXT: metric '%s' of instance '%s' of context '%s' of host '%s', failed to be deleted from rrdmetrics dictionary.",
+ string2str(rm->id),
+ string2str(ri->id),
+ string2str(rc->id),
+ rrdhost_hostname(host));
+ else
+ internal_error(
+ true,
+ "RRDCONTEXT: metric '%s' of instance '%s' of context '%s' of host '%s', deleted from rrdmetrics dictionary.",
+ string2str(rm->id),
+ string2str(ri->id),
+ string2str(rc->id),
+ rrdhost_hostname(host));
+ }
+ }
+ dfe_done(rm);
+
+ if(rrdinstance_should_be_deleted(ri)) {
+ if(worker_jobs) worker_is_busy(WORKER_JOB_CLEANUP_DELETE);
+ if(!dictionary_del(rc->rrdinstances, string2str(ri->id)))
+ error("RRDCONTEXT: instance '%s' of context '%s' of host '%s', failed to be deleted from rrdmetrics dictionary.",
+ string2str(ri->id),
+ string2str(rc->id),
+ rrdhost_hostname(host));
+ else
+ internal_error(
+ true,
+ "RRDCONTEXT: instance '%s' of context '%s' of host '%s', deleted from rrdmetrics dictionary.",
+ string2str(ri->id),
+ string2str(rc->id),
+ rrdhost_hostname(host));
+ }
+ }
+ dfe_done(ri);
+
+ if(unlikely(rrdcontext_should_be_deleted(rc))) {
+ if(worker_jobs) worker_is_busy(WORKER_JOB_CLEANUP_DELETE);
+ rrdcontext_dequeue_from_post_processing(rc);
+ rrdcontext_delete_from_sql_unsafe(rc);
+
+ if(!dictionary_del(host->rrdctx.contexts, string2str(rc->id)))
+ error("RRDCONTEXT: context '%s' of host '%s', failed to be deleted from rrdmetrics dictionary.",
+ string2str(rc->id),
+ rrdhost_hostname(host));
+ else
+ internal_error(
+ true,
+ "RRDCONTEXT: context '%s' of host '%s', deleted from rrdmetrics dictionary.",
+ string2str(rc->id),
+ rrdhost_hostname(host));
+ }
+
+ // the item is referenced in the dictionary
+ // so, it is still here to unlock, even if we have deleted it
+ rrdcontext_unlock(rc);
+ }
+ dfe_done(rc);
+}
+
+static void rrdcontext_garbage_collect_for_all_hosts(void) {
+ RRDHOST *host;
+ dfe_start_reentrant(rrdhost_root_index, host) {
+ rrdcontext_garbage_collect_single_host(host, true);
+ }
+ dfe_done(host);
+}
+
+// ----------------------------------------------------------------------------
+// post processing
+
+static void rrdmetric_process_updates(RRDMETRIC *rm, bool force, RRD_FLAGS reason, bool worker_jobs) {
+ if(reason != RRD_FLAG_NONE)
+ rrd_flag_set_updated(rm, reason);
+
+ if(!force && !rrd_flag_is_updated(rm) && rrd_flag_check(rm, RRD_FLAG_LIVE_RETENTION) && !rrd_flag_check(rm, RRD_FLAG_UPDATE_REASON_UPDATE_RETENTION))
+ return;
+
+ if(worker_jobs)
+ worker_is_busy(WORKER_JOB_PP_METRIC);
+
+ if(reason & RRD_FLAG_UPDATE_REASON_DISCONNECTED_CHILD) {
+ rrd_flag_set_archived(rm);
+ rrd_flag_set(rm, RRD_FLAG_UPDATE_REASON_DISCONNECTED_CHILD);
+ }
+ if(rrd_flag_is_deleted(rm) && (reason & RRD_FLAG_UPDATE_REASON_UPDATE_RETENTION))
+ rrd_flag_set_archived(rm);
+
+ rrdmetric_update_retention(rm);
+
+ rrd_flag_unset_updated(rm);
+}
+
+static void rrdinstance_post_process_updates(RRDINSTANCE *ri, bool force, RRD_FLAGS reason, bool worker_jobs) {
+ if(reason != RRD_FLAG_NONE)
+ rrd_flag_set_updated(ri, reason);
+
+ if(!force && !rrd_flag_is_updated(ri) && rrd_flag_check(ri, RRD_FLAG_LIVE_RETENTION))
+ return;
+
+ if(worker_jobs)
+ worker_is_busy(WORKER_JOB_PP_INSTANCE);
+
+ time_t min_first_time_t = LONG_MAX, max_last_time_t = 0;
+ size_t metrics_active = 0, metrics_deleted = 0;
+ bool live_retention = true, currently_collected = false;
+ if(dictionary_entries(ri->rrdmetrics) > 0) {
+ RRDMETRIC *rm;
+ dfe_start_read((DICTIONARY *)ri->rrdmetrics, rm) {
+ if(unlikely(!service_running(SERVICE_CONTEXT))) break;
+
+ RRD_FLAGS reason_to_pass = reason;
+ if(rrd_flag_check(ri, RRD_FLAG_UPDATE_REASON_UPDATE_RETENTION))
+ reason_to_pass |= RRD_FLAG_UPDATE_REASON_UPDATE_RETENTION;
+
+ rrdmetric_process_updates(rm, force, reason_to_pass, worker_jobs);
+
+ if(unlikely(!rrd_flag_check(rm, RRD_FLAG_LIVE_RETENTION)))
+ live_retention = false;
+
+ if (unlikely((rrdmetric_should_be_deleted(rm)))) {
+ metrics_deleted++;
+ continue;
+ }
+
+ if(!currently_collected && rrd_flag_check(rm, RRD_FLAG_COLLECTED) && rm->first_time_s)
+ currently_collected = true;
+
+ metrics_active++;
+
+ if (rm->first_time_s && rm->first_time_s < min_first_time_t)
+ min_first_time_t = rm->first_time_s;
+
+ if (rm->last_time_s && rm->last_time_s > max_last_time_t)
+ max_last_time_t = rm->last_time_s;
+ }
+ dfe_done(rm);
+ }
+
+ if(unlikely(live_retention && !rrd_flag_check(ri, RRD_FLAG_LIVE_RETENTION)))
+ rrd_flag_set(ri, RRD_FLAG_LIVE_RETENTION);
+ else if(unlikely(!live_retention && rrd_flag_check(ri, RRD_FLAG_LIVE_RETENTION)))
+ rrd_flag_clear(ri, RRD_FLAG_LIVE_RETENTION);
+
+ if(unlikely(!metrics_active)) {
+ // no metrics available
+
+ if(ri->first_time_s) {
+ ri->first_time_s = 0;
+ rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T);
+ }
+
+ if(ri->last_time_s) {
+ ri->last_time_s = 0;
+ rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T);
+ }
+
+ rrd_flag_set_deleted(ri, RRD_FLAG_UPDATE_REASON_ZERO_RETENTION);
+ }
+ else {
+ // we have active metrics...
+
+ if (unlikely(min_first_time_t == LONG_MAX))
+ min_first_time_t = 0;
+
+ if (unlikely(min_first_time_t == 0 || max_last_time_t == 0)) {
+ if(ri->first_time_s) {
+ ri->first_time_s = 0;
+ rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T);
+ }
+
+ if(ri->last_time_s) {
+ ri->last_time_s = 0;
+ rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T);
+ }
+
+ if(likely(live_retention))
+ rrd_flag_set_deleted(ri, RRD_FLAG_UPDATE_REASON_ZERO_RETENTION);
+ }
+ else {
+ rrd_flag_clear(ri, RRD_FLAG_UPDATE_REASON_ZERO_RETENTION);
+
+ if (unlikely(ri->first_time_s != min_first_time_t)) {
+ ri->first_time_s = min_first_time_t;
+ rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T);
+ }
+
+ if (unlikely(ri->last_time_s != max_last_time_t)) {
+ ri->last_time_s = max_last_time_t;
+ rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T);
+ }
+
+ if(likely(currently_collected))
+ rrd_flag_set_collected(ri);
+ else
+ rrd_flag_set_archived(ri);
+ }
+ }
+
+ rrd_flag_unset_updated(ri);
+}
+
+static void rrdcontext_post_process_updates(RRDCONTEXT *rc, bool force, RRD_FLAGS reason, bool worker_jobs) {
+ if(reason != RRD_FLAG_NONE)
+ rrd_flag_set_updated(rc, reason);
+
+ if(worker_jobs)
+ worker_is_busy(WORKER_JOB_PP_CONTEXT);
+
+ size_t min_priority_collected = LONG_MAX;
+ size_t min_priority_not_collected = LONG_MAX;
+ size_t min_priority = LONG_MAX;
+ time_t min_first_time_t = LONG_MAX, max_last_time_t = 0;
+ size_t instances_active = 0, instances_deleted = 0;
+ bool live_retention = true, currently_collected = false, hidden = true;
+ if(dictionary_entries(rc->rrdinstances) > 0) {
+ RRDINSTANCE *ri;
+ dfe_start_reentrant(rc->rrdinstances, ri) {
+ if(unlikely(!service_running(SERVICE_CONTEXT))) break;
+
+ RRD_FLAGS reason_to_pass = reason;
+ if(rrd_flag_check(rc, RRD_FLAG_UPDATE_REASON_UPDATE_RETENTION))
+ reason_to_pass |= RRD_FLAG_UPDATE_REASON_UPDATE_RETENTION;
+
+ rrdinstance_post_process_updates(ri, force, reason_to_pass, worker_jobs);
+
+ if(unlikely(hidden && !rrd_flag_check(ri, RRD_FLAG_HIDDEN)))
+ hidden = false;
+
+ if(unlikely(live_retention && !rrd_flag_check(ri, RRD_FLAG_LIVE_RETENTION)))
+ live_retention = false;
+
+ if (unlikely(rrdinstance_should_be_deleted(ri))) {
+ instances_deleted++;
+ continue;
+ }
+
+ if(unlikely(!currently_collected && rrd_flag_is_collected(ri) && ri->first_time_s))
+ currently_collected = true;
+
+ internal_error(rc->units != ri->units,
+ "RRDCONTEXT: '%s' rrdinstance '%s' has different units, context '%s', instance '%s'",
+ string2str(rc->id), string2str(ri->id),
+ string2str(rc->units), string2str(ri->units));
+
+ instances_active++;
+
+ if (ri->priority >= RRDCONTEXT_MINIMUM_ALLOWED_PRIORITY) {
+ if(rrd_flag_check(ri, RRD_FLAG_COLLECTED)) {
+ if(ri->priority < min_priority_collected)
+ min_priority_collected = ri->priority;
+ }
+ else {
+ if(ri->priority < min_priority_not_collected)
+ min_priority_not_collected = ri->priority;
+ }
+ }
+
+ if (ri->first_time_s && ri->first_time_s < min_first_time_t)
+ min_first_time_t = ri->first_time_s;
+
+ if (ri->last_time_s && ri->last_time_s > max_last_time_t)
+ max_last_time_t = ri->last_time_s;
+ }
+ dfe_done(ri);
+
+ if(min_priority_collected != LONG_MAX)
+ // use the collected priority
+ min_priority = min_priority_collected;
+ else
+ // use the non-collected priority
+ min_priority = min_priority_not_collected;
+ }
+
+ {
+ bool previous_hidden = rrd_flag_check(rc, RRD_FLAG_HIDDEN);
+ if (hidden != previous_hidden) {
+ if (hidden && !rrd_flag_check(rc, RRD_FLAG_HIDDEN))
+ rrd_flag_set(rc, RRD_FLAG_HIDDEN);
+ else if (!hidden && rrd_flag_check(rc, RRD_FLAG_HIDDEN))
+ rrd_flag_clear(rc, RRD_FLAG_HIDDEN);
+ }
+
+ bool previous_live_retention = rrd_flag_check(rc, RRD_FLAG_LIVE_RETENTION);
+ if (live_retention != previous_live_retention) {
+ if (live_retention && !rrd_flag_check(rc, RRD_FLAG_LIVE_RETENTION))
+ rrd_flag_set(rc, RRD_FLAG_LIVE_RETENTION);
+ else if (!live_retention && rrd_flag_check(rc, RRD_FLAG_LIVE_RETENTION))
+ rrd_flag_clear(rc, RRD_FLAG_LIVE_RETENTION);
+ }
+ }
+
+ rrdcontext_lock(rc);
+ rc->pp.executions++;
+
+ if(unlikely(!instances_active)) {
+ // we had some instances, but they are gone now...
+
+ if(rc->first_time_s) {
+ rc->first_time_s = 0;
+ rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T);
+ }
+
+ if(rc->last_time_s) {
+ rc->last_time_s = 0;
+ rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T);
+ }
+
+ rrd_flag_set_deleted(rc, RRD_FLAG_UPDATE_REASON_ZERO_RETENTION);
+ }
+ else {
+ // we have some active instances...
+
+ if (unlikely(min_first_time_t == LONG_MAX))
+ min_first_time_t = 0;
+
+ if (unlikely(min_first_time_t == 0 && max_last_time_t == 0)) {
+ if(rc->first_time_s) {
+ rc->first_time_s = 0;
+ rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T);
+ }
+
+ if(rc->last_time_s) {
+ rc->last_time_s = 0;
+ rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T);
+ }
+
+ rrd_flag_set_deleted(rc, RRD_FLAG_UPDATE_REASON_ZERO_RETENTION);
+ }
+ else {
+ rrd_flag_clear(rc, RRD_FLAG_UPDATE_REASON_ZERO_RETENTION);
+
+ if (unlikely(rc->first_time_s != min_first_time_t)) {
+ rc->first_time_s = min_first_time_t;
+ rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T);
+ }
+
+ if (rc->last_time_s != max_last_time_t) {
+ rc->last_time_s = max_last_time_t;
+ rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T);
+ }
+
+ if(likely(currently_collected))
+ rrd_flag_set_collected(rc);
+ else
+ rrd_flag_set_archived(rc);
+ }
+
+ if (min_priority != LONG_MAX && rc->priority != min_priority) {
+ rc->priority = min_priority;
+ rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_METADATA);
+ }
+ }
+
+ if(unlikely(rrd_flag_is_updated(rc) && rc->rrdhost->rrdctx.hub_queue)) {
+ if(check_if_cloud_version_changed_unsafe(rc, false)) {
+ rc->version = rrdcontext_get_next_version(rc);
+ dictionary_set((DICTIONARY *)rc->rrdhost->rrdctx.hub_queue,
+ string2str(rc->id), rc, sizeof(*rc));
+ }
+ }
+
+ rrd_flag_unset_updated(rc);
+ rrdcontext_unlock(rc);
+}
+
+void rrdcontext_queue_for_post_processing(RRDCONTEXT *rc, const char *function __maybe_unused, RRD_FLAGS flags __maybe_unused) {
+ if(unlikely(!rc->rrdhost->rrdctx.pp_queue)) return;
+
+ if(!rrd_flag_check(rc, RRD_FLAG_QUEUED_FOR_PP)) {
+ dictionary_set((DICTIONARY *)rc->rrdhost->rrdctx.pp_queue,
+ string2str(rc->id),
+ rc,
+ sizeof(*rc));
+
+#if(defined(NETDATA_INTERNAL_CHECKS) && defined(LOG_POST_PROCESSING_QUEUE_INSERTIONS))
+ {
+ BUFFER *wb_flags = buffer_create(1000);
+ rrd_flags_to_buffer(flags, wb_flags);
+
+ BUFFER *wb_reasons = buffer_create(1000);
+ rrd_reasons_to_buffer(flags, wb_reasons);
+
+ internal_error(true, "RRDCONTEXT: '%s' update triggered by function %s(), due to flags: %s, reasons: %s",
+ string2str(rc->id), function,
+ buffer_tostring(wb_flags),
+ buffer_tostring(wb_reasons));
+
+ buffer_free(wb_reasons);
+ buffer_free(wb_flags);
+ }
+#endif
+ }
+}
+
+static void rrdcontext_dequeue_from_post_processing(RRDCONTEXT *rc) {
+ if(unlikely(!rc->rrdhost->rrdctx.pp_queue)) return;
+ dictionary_del(rc->rrdhost->rrdctx.pp_queue, string2str(rc->id));
+}
+
+static void rrdcontext_post_process_queued_contexts(RRDHOST *host) {
+ if(unlikely(!host->rrdctx.pp_queue)) return;
+
+ RRDCONTEXT *rc;
+ dfe_start_reentrant(host->rrdctx.pp_queue, rc) {
+ if(unlikely(!service_running(SERVICE_CONTEXT))) break;
+
+ rrdcontext_dequeue_from_post_processing(rc);
+ rrdcontext_post_process_updates(rc, false, RRD_FLAG_NONE, true);
+ }
+ dfe_done(rc);
+}
+
+// ----------------------------------------------------------------------------
+// dispatching contexts to cloud
+
+static uint64_t rrdcontext_get_next_version(RRDCONTEXT *rc) {
+ time_t now = now_realtime_sec();
+ uint64_t version = MAX(rc->version, rc->hub.version);
+ version = MAX((uint64_t)now, version);
+ version++;
+ return version;
+}
+
+void rrdcontext_message_send_unsafe(RRDCONTEXT *rc, bool snapshot __maybe_unused, void *bundle __maybe_unused) {
+
+ // save it, so that we know the last version we sent to hub
+ rc->version = rc->hub.version = rrdcontext_get_next_version(rc);
+ rc->hub.id = string2str(rc->id);
+ rc->hub.title = string2str(rc->title);
+ rc->hub.units = string2str(rc->units);
+ rc->hub.family = string2str(rc->family);
+ rc->hub.chart_type = rrdset_type_name(rc->chart_type);
+ rc->hub.priority = rc->priority;
+ rc->hub.first_time_s = rc->first_time_s;
+ rc->hub.last_time_s = rrd_flag_is_collected(rc) ? 0 : rc->last_time_s;
+ rc->hub.deleted = rrd_flag_is_deleted(rc) ? true : false;
+
+#ifdef ENABLE_ACLK
+ struct context_updated message = {
+ .id = rc->hub.id,
+ .version = rc->hub.version,
+ .title = rc->hub.title,
+ .units = rc->hub.units,
+ .family = rc->hub.family,
+ .chart_type = rc->hub.chart_type,
+ .priority = rc->hub.priority,
+ .first_entry = rc->hub.first_time_s,
+ .last_entry = rc->hub.last_time_s,
+ .deleted = rc->hub.deleted,
+ };
+
+ if(likely(!rrd_flag_check(rc, RRD_FLAG_HIDDEN))) {
+ if (snapshot) {
+ if (!rc->hub.deleted)
+ contexts_snapshot_add_ctx_update(bundle, &message);
+ }
+ else
+ contexts_updated_add_ctx_update(bundle, &message);
+ }
+#endif
+
+ // store it to SQL
+
+ if(rrd_flag_is_deleted(rc))
+ rrdcontext_delete_from_sql_unsafe(rc);
+
+ else if (ctx_store_context(&rc->rrdhost->host_uuid, &rc->hub) != 0)
+ error("RRDCONTEXT: failed to save context '%s' version %"PRIu64" to SQL.", rc->hub.id, rc->hub.version);
+}
+
+static bool check_if_cloud_version_changed_unsafe(RRDCONTEXT *rc, bool sending __maybe_unused) {
+ bool id_changed = false,
+ title_changed = false,
+ units_changed = false,
+ family_changed = false,
+ chart_type_changed = false,
+ priority_changed = false,
+ first_time_changed = false,
+ last_time_changed = false,
+ deleted_changed = false;
+
+ RRD_FLAGS flags = rrd_flags_get(rc);
+
+ if(unlikely(string2str(rc->id) != rc->hub.id))
+ id_changed = true;
+
+ if(unlikely(string2str(rc->title) != rc->hub.title))
+ title_changed = true;
+
+ if(unlikely(string2str(rc->units) != rc->hub.units))
+ units_changed = true;
+
+ if(unlikely(string2str(rc->family) != rc->hub.family))
+ family_changed = true;
+
+ if(unlikely(rrdset_type_name(rc->chart_type) != rc->hub.chart_type))
+ chart_type_changed = true;
+
+ if(unlikely(rc->priority != rc->hub.priority))
+ priority_chan