summaryrefslogtreecommitdiffstats
path: root/database
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-11-14 01:22:03 +0200
committerGitHub <noreply@github.com>2022-11-14 01:22:03 +0200
commitcbebc18ca3e52ec5ab41272ef85a5fd45d8d3b10 (patch)
tree521cd5e364b28455015857245fd7a1e7ece8a476 /database
parent1dcb9d7ece382cf52a27e6ec9a4c795fd5902a1e (diff)
replication improvements (#13989)
contexts are not set to collected until replication finishes for them; sender thread disables cancelability while replication queries are executed;
Diffstat (limited to 'database')
-rw-r--r--database/rrdcontext.c34
-rw-r--r--database/rrdcontext.h2
-rw-r--r--database/rrdset.c5
3 files changed, 32 insertions, 9 deletions
diff --git a/database/rrdcontext.c b/database/rrdcontext.c
index fb283d93f3..d39585cdc4 100644
--- a/database/rrdcontext.c
+++ b/database/rrdcontext.c
@@ -63,9 +63,7 @@ typedef enum {
RRD_FLAG_UPDATE_REASON_DB_ROTATION = (1 << 28), // this context changed because of a db rotation
RRD_FLAG_UPDATE_REASON_UNUSED = (1 << 29), // this context is not used anymore
RRD_FLAG_UPDATE_REASON_CHANGED_FLAGS = (1 << 30), // this context is not used anymore
-
- // DO NOT ADD (1 << 31) or bigger!
- // runtime error: left shift of 1 by 31 places cannot be represented in type 'int'
+ RRD_FLAG_UPDATE_REASON_UPDATED_RETENTION = (1 << 31), // this object has updated retention
} RRD_FLAGS;
#define RRD_FLAG_ALL_UPDATE_REASONS ( \
@@ -229,6 +227,7 @@ static struct rrdcontext_reason {
{ RRD_FLAG_UPDATE_REASON_DISCONNECTED_CHILD, "child disconnected", 65 * USEC_PER_SEC },
{ RRD_FLAG_UPDATE_REASON_DB_ROTATION, "db rotation", 65 * USEC_PER_SEC },
{ RRD_FLAG_UPDATE_REASON_CHANGED_FLAGS, "changed flags", 65 * USEC_PER_SEC },
+ { RRD_FLAG_UPDATE_REASON_UPDATED_RETENTION, "updated retention", 65 * USEC_PER_SEC },
// terminator
{ 0, NULL, 0 },
@@ -1097,6 +1096,14 @@ static inline void rrdinstance_rrdset_is_freed(RRDSET *st) {
st->rrdcontext = NULL;
}
+static inline void rrdinstance_rrdset_has_updated_retention(RRDSET *st) {
+ RRDINSTANCE *ri = rrdset_get_rrdinstance(st);
+ if(unlikely(!ri)) return;
+
+ rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_UPDATED_RETENTION);
+ rrdinstance_trigger_updates(ri, __FUNCTION__ );
+}
+
static inline void rrdinstance_updated_rrdset_name(RRDSET *st) {
// the chart may not be initialized when this is called
if(unlikely(!st->rrdinstance)) return;
@@ -1467,6 +1474,10 @@ void rrdcontext_removed_rrdset(RRDSET *st) {
rrdinstance_rrdset_is_freed(st);
}
+void rrdcontext_updated_retention_rrdset(RRDSET *st) {
+ rrdinstance_rrdset_has_updated_retention(st);
+}
+
void rrdcontext_updated_rrdset_name(RRDSET *st) {
rrdinstance_updated_rrdset_name(st);
}
@@ -2743,9 +2754,10 @@ QUERY_TARGET *query_target_create(QUERY_TARGET_REQUEST *qtr) {
QUERY_TARGET *qt = &thread_query_target;
if(qt->used)
- fatal("QUERY TARGET: this query target is already used.");
+ fatal("QUERY TARGET: this query target is already used (%zu queries made with this QUERY_TARGET so far).", qt->queries);
qt->used = true;
+ qt->queries++;
// copy the request into query_thread_target
qt->request = *qtr;
@@ -3248,7 +3260,7 @@ static void rrdmetric_process_updates(RRDMETRIC *rm, bool force, RRD_FLAGS reaso
if(reason != RRD_FLAG_NONE)
rrd_flag_set_updated(rm, reason);
- if(!force && !rrd_flag_is_updated(rm) && rrd_flag_check(rm, RRD_FLAG_LIVE_RETENTION))
+ if(!force && !rrd_flag_is_updated(rm) && rrd_flag_check(rm, RRD_FLAG_LIVE_RETENTION) && !rrd_flag_check(rm, RRD_FLAG_UPDATE_REASON_UPDATED_RETENTION))
return;
if(worker_jobs)
@@ -3282,7 +3294,11 @@ static void rrdinstance_post_process_updates(RRDINSTANCE *ri, bool force, RRD_FL
dfe_start_read((DICTIONARY *)ri->rrdmetrics, rm) {
if(unlikely(netdata_exit)) break;
- rrdmetric_process_updates(rm, force, reason, worker_jobs);
+ RRD_FLAGS reason_to_pass = reason;
+ if(rrd_flag_check(ri, RRD_FLAG_UPDATE_REASON_UPDATED_RETENTION))
+ reason_to_pass |= RRD_FLAG_UPDATE_REASON_UPDATED_RETENTION;
+
+ rrdmetric_process_updates(rm, force, reason_to_pass, worker_jobs);
if(unlikely(!rrd_flag_check(rm, RRD_FLAG_LIVE_RETENTION)))
live_retention = false;
@@ -3385,7 +3401,11 @@ static void rrdcontext_post_process_updates(RRDCONTEXT *rc, bool force, RRD_FLAG
dfe_start_reentrant(rc->rrdinstances, ri) {
if(unlikely(netdata_exit)) break;
- rrdinstance_post_process_updates(ri, force, reason, worker_jobs);
+ RRD_FLAGS reason_to_pass = reason;
+ if(rrd_flag_check(rc, RRD_FLAG_UPDATE_REASON_UPDATED_RETENTION))
+ reason_to_pass |= RRD_FLAG_UPDATE_REASON_UPDATED_RETENTION;
+
+ rrdinstance_post_process_updates(ri, force, reason_to_pass, worker_jobs);
if(unlikely(hidden && !rrd_flag_check(ri, RRD_FLAG_HIDDEN)))
hidden = false;
diff --git a/database/rrdcontext.h b/database/rrdcontext.h
index 68736d4fdc..ab8f31b30e 100644
--- a/database/rrdcontext.h
+++ b/database/rrdcontext.h
@@ -87,6 +87,7 @@ void rrdcontext_updated_rrdset(RRDSET *st);
void rrdcontext_removed_rrdset(RRDSET *st);
void rrdcontext_updated_rrdset_name(RRDSET *st);
void rrdcontext_updated_rrdset_flags(RRDSET *st);
+void rrdcontext_updated_retention_rrdset(RRDSET *st);
void rrdcontext_collected_rrdset(RRDSET *st);
int rrdcontext_find_chart_uuid(RRDSET *st, uuid_t *store_uuid);
@@ -177,6 +178,7 @@ typedef struct query_target {
QUERY_TARGET_REQUEST request;
bool used; // when true, this query is currently being used
+ size_t queries; // how many query we have done so far
struct {
bool relative; // true when the request made with relative timestamps, true if it was absolute
diff --git a/database/rrdset.c b/database/rrdset.c
index 53b15d213f..8ebd52ba0f 100644
--- a/database/rrdset.c
+++ b/database/rrdset.c
@@ -1097,8 +1097,6 @@ void rrddim_store_metric(RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n,
store_metric_at_tier(rd, t, sp, point_end_time_ut);
}
-
- rrdcontext_collected_rrddim(rd);
}
// caching of dimensions rrdset_done() and rrdset_done_interpolate() loop through
@@ -1257,6 +1255,7 @@ static inline size_t rrdset_done_interpolate(
if(unlikely(!store_this_entry)) {
(void) ml_is_anomalous(rd, 0, false);
rrddim_store_metric(rd, next_store_ut, NAN, SN_FLAG_NONE);
+ rrdcontext_collected_rrddim(rd);
continue;
}
@@ -1269,6 +1268,7 @@ static inline size_t rrdset_done_interpolate(
}
rrddim_store_metric(rd, next_store_ut, new_value, dim_storage_flags);
+ rrdcontext_collected_rrddim(rd);
rd->last_stored_value = new_value;
}
else {
@@ -1277,6 +1277,7 @@ static inline size_t rrdset_done_interpolate(
rrdset_debug(st, "%s: STORE[%ld] = NON EXISTING ", rrddim_name(rd), current_entry);
rrddim_store_metric(rd, next_store_ut, NAN, SN_FLAG_NONE);
+ rrdcontext_collected_rrddim(rd);
rd->last_stored_value = NAN;
}