summaryrefslogtreecommitdiffstats
path: root/database/rrdset.c
diff options
context:
space:
mode:
authorStelios Fragkakis <52996999+stelfrag@users.noreply.github.com>2022-07-06 14:01:53 +0300
committerGitHub <noreply@github.com>2022-07-06 14:01:53 +0300
commit49234f23de3a32682daff07ca229b6b62f24c090 (patch)
treea81ed628abcf4457737bcc3597b097e8e430497a /database/rrdset.c
parent8d5850fd49bf6308cd6cab690cdbba4a35505b39 (diff)
Multi-Tier database backend for long term metrics storage (#13263)
* Tier part 1 * Tier part 2 * Tier part 3 * Tier part 4 * Tier part 5 * Fix some ML compilation errors * fix more conflicts * pass proper tier * move metric_uuid from state to RRDDIM * move aclk_live_status from state to RRDDIM * move ml_dimension from state to RRDDIM * abstracted the data collection interface * support flushing for mem db too * abstracted the query api * abstracted latest/oldest time per metric * cleanup * store_metric for tier1 * fix for store_metric * allow multiple tiers, more than 2 * state to tier * Change storage type in db. Query param to request min, max, sum or average * Store tier data correctly * Fix skipping tier page type * Add tier grouping in the tier * Fix to handle archived charts (part 1) * Temp fix for query granularity when requesting tier1 data * Fix parameters in the correct order and calculate the anomaly based on the anomaly count * Proper tiering grouping * Anomaly calculation based on anomaly count * force type checking on storage handles * update cmocka tests * fully dynamic number of storage tiers * fix static allocation * configure grouping for all tiers; disable tiers for unittest; disable statsd configuration for private charts mode * use default page dt using the tiering info * automatic selection of tier * fix for automatic selection of tier * working prototype of dynamic tier selection * automatic selection of tier done right (I hope) * ask for the proper tier value, based on the grouping function * fixes for unittests and load_metric_next() * fixes for lgtm findings * minor renames * add dbengine to page cache size setting * add dbengine to page cache with malloc * query engine optimized to loop as little are required based on the view_update_every * query engine grouping methods now do not assume a constant number of points per group and they allocate memory with OWA * report db points per tier in jsonwrap * query planer that switches database tiers on the fly to satisfy the query for the entire timeframe * dbegnine statistics and documentation (in progress) * calculate average point duration in db * handle single point pages the best we can * handle single point pages even better * Keep page type in the rrdeng_page_descr * updated doc * handle future backwards compatibility - improved statistics * support &tier=X in queries * enfore increasing iterations on tiers * tier 1 is always 1 iteration * backfilling higher tiers on first data collection * reversed anomaly bit * set up to 5 tiers * natural points should only be offered on tier 0, except a specific tier is selected * do not allow more than 65535 points of tier0 to be aggregated on any tier * Work only on actually activated tiers * fix query interpolation * fix query interpolation again * fix lgtm finding * Activate one tier for now * backfilling of higher tiers using raw metrics from lower tiers * fix for crash on start when storage tiers is increased from the default * more statistics on exit * fix bug that prevented higher tiers to get any values; added backfilling options * fixed the statistics log line * removed limit of 255 iterations per tier; moved the code of freezing rd->tiers[x]->db_metric_handle * fixed division by zero on zero points_wanted * removed dead code * Decide on the descr->type for the type of metric * dont store metrics on unknown page types * free db_metric_handle on sql based context queries * Disable STORAGE_POINT value check in the exporting engine unit tests * fix for db modes other than dbengine * fix for aclk archived chart queries destroying db_metric_handles of valid rrddims * fix left-over freez() instead of OWA freez on median queries Co-authored-by: Costa Tsaousis <costa@netdata.cloud> Co-authored-by: Vladimir Kobal <vlad@prokk.net>
Diffstat (limited to 'database/rrdset.c')
-rw-r--r--database/rrdset.c142
1 files changed, 128 insertions, 14 deletions
diff --git a/database/rrdset.c b/database/rrdset.c
index 1bca6c1349..1ae862a22c 100644
--- a/database/rrdset.c
+++ b/database/rrdset.c
@@ -273,12 +273,13 @@ void rrdset_reset(RRDSET *st) {
rd->last_collected_time.tv_sec = 0;
rd->last_collected_time.tv_usec = 0;
rd->collections_counter = 0;
- // memset(rd->values, 0, rd->entries * sizeof(storage_number));
-#ifdef ENABLE_DBENGINE
- if (RRD_MEMORY_MODE_DBENGINE == st->rrd_memory_mode && !rrddim_flag_check(rd, RRDDIM_FLAG_ARCHIVED)) {
- rrdeng_store_metric_flush_current_page(rd);
+
+ if(!rrddim_flag_check(rd, RRDDIM_FLAG_ARCHIVED)) {
+ for(int tier = 0; tier < storage_tiers ;tier++) {
+ if(rd->tiers[tier])
+ rd->tiers[tier]->collect_ops.flush(rd->tiers[tier]->db_collection_handle);
+ }
}
-#endif
}
}
@@ -963,6 +964,105 @@ static inline usec_t rrdset_init_last_updated_time(RRDSET *st) {
return last_updated_ut;
}
+static inline time_t tier_next_point_time(RRDDIM *rd, struct rrddim_tier *t, time_t now) {
+ time_t loop = (time_t)rd->update_every * (time_t)t->tier_grouping;
+ return now + loop - ((now + loop) % loop);
+}
+
+void store_metric_at_tier(RRDDIM *rd, struct rrddim_tier *t, STORAGE_POINT sp, usec_t now_ut) {
+ if (unlikely(!t->next_point_time))
+ t->next_point_time = tier_next_point_time(rd, t, sp.end_time);
+
+ // merge the dates into our virtual point
+ if (unlikely(sp.start_time < t->virtual_point.start_time))
+ t->virtual_point.start_time = sp.start_time;
+
+ if (likely(sp.end_time > t->virtual_point.end_time))
+ t->virtual_point.end_time = sp.end_time;
+
+ // merge the values into our virtual point
+ if (likely(!storage_point_is_empty(sp))) {
+ // we aggregate only non NULLs into higher tiers
+
+ if (likely(!storage_point_is_unset(t->virtual_point))) {
+ // merge the collected point to our virtual one
+ t->virtual_point.sum += sp.sum;
+ t->virtual_point.min = MIN(t->virtual_point.min, sp.min);
+ t->virtual_point.max = MAX(t->virtual_point.max, sp.max);
+ t->virtual_point.count += sp.count;
+ t->virtual_point.anomaly_count += sp.anomaly_count;
+ t->virtual_point.flags |= sp.flags;
+ }
+ else {
+ // reset our virtual point to this one
+ t->virtual_point = sp;
+ }
+ }
+
+ if(unlikely(sp.end_time >= t->next_point_time)) {
+ if (likely(!storage_point_is_unset(t->virtual_point))) {
+
+ t->collect_ops.store_metric(
+ t->db_collection_handle,
+ now_ut,
+ t->virtual_point.sum,
+ t->virtual_point.min,
+ t->virtual_point.max,
+ t->virtual_point.count,
+ t->virtual_point.anomaly_count,
+ t->virtual_point.flags);
+ }
+ else {
+ t->collect_ops.store_metric(
+ t->db_collection_handle,
+ now_ut,
+ NAN,
+ NAN,
+ NAN,
+ 0,
+ 0,
+ SN_EMPTY_SLOT);
+ }
+
+ t->virtual_point.count = 0;
+ t->next_point_time = tier_next_point_time(rd, t, sp.end_time);
+ }
+}
+
+static void store_metric(RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n, SN_FLAGS flags) {
+
+ // store the metric on tier 0
+ rd->tiers[0]->collect_ops.store_metric(rd->tiers[0]->db_collection_handle, point_end_time_ut, n, 0, 0, 1, 0, flags);
+
+ for(int tier = 1; tier < storage_tiers ;tier++) {
+ if(unlikely(!rd->tiers[tier])) continue;
+
+ struct rrddim_tier *t = rd->tiers[tier];
+
+ time_t now = (time_t)(point_end_time_ut / USEC_PER_SEC);
+
+ if(!t->last_collected_ut) {
+ // we have not collected this tier before
+ // let's fill any gap that may exist
+ rrdr_fill_tier_gap_from_smaller_tiers(rd, tier, now);
+ }
+
+ STORAGE_POINT sp = {
+ .start_time = now - rd->update_every,
+ .end_time = now,
+ .min = n,
+ .max = n,
+ .sum = n,
+ .count = 1,
+ .anomaly_count = (flags & SN_ANOMALY_BIT) ? 0 : 1,
+ .flags = flags
+ };
+
+ t->last_collected_ut = point_end_time_ut;
+ store_metric_at_tier(rd, t, sp, point_end_time_ut);
+ }
+}
+
static inline size_t rrdset_done_interpolate(
RRDSET *st
, usec_t update_every_ut
@@ -1086,8 +1186,8 @@ static inline size_t rrdset_done_interpolate(
if(unlikely(!store_this_entry)) {
(void) ml_is_anomalous(rd, 0, false);
-
- rd->state->collect_ops.store_metric(rd, next_store_ut, NAN, SN_EMPTY_SLOT);
+// rd->state->collect_ops.store_metric(rd, next_store_ut, NAN, 0, 0, 1, SN_EMPTY_SLOT, 0);
+ store_metric(rd, next_store_ut, NAN, SN_EMPTY_SLOT);
continue;
}
@@ -1099,7 +1199,8 @@ static inline size_t rrdset_done_interpolate(
dim_storage_flags &= ~ ((uint32_t) SN_ANOMALY_BIT);
}
- rd->state->collect_ops.store_metric(rd, next_store_ut, new_value, dim_storage_flags);
+// rd->state->collect_ops.store_metric(rd, next_store_ut, new_value, 0, 0, 1, dim_storage_flags, 0);
+ store_metric(rd, next_store_ut, new_value, dim_storage_flags);
rd->last_stored_value = new_value;
}
else {
@@ -1112,7 +1213,8 @@ static inline size_t rrdset_done_interpolate(
);
#endif
- rd->state->collect_ops.store_metric(rd, next_store_ut, NAN, SN_EMPTY_SLOT);
+// rd->state->collect_ops.store_metric(rd, next_store_ut, NAN, 0, 0, 1, SN_EMPTY_SLOT, 0);
+ store_metric(rd, next_store_ut, NAN, SN_EMPTY_SLOT);
rd->last_stored_value = NAN;
}
@@ -1597,10 +1699,10 @@ after_first_database_work:
// it is now time to interpolate values on a second boundary
#ifdef NETDATA_INTERNAL_CHECKS
- if(unlikely(now_collect_ut < next_store_ut)) {
+ if(unlikely(now_collect_ut < next_store_ut && st->counter_done > 1)) {
// this is collected in the same interpolation point
rrdset_debug(st, "THIS IS IN THE SAME INTERPOLATION POINT");
- info("INTERNAL CHECK: host '%s', chart '%s' is collected in the same interpolation point: short by %llu microseconds", st->rrdhost->hostname, st->name, next_store_ut - now_collect_ut);
+ info("INTERNAL CHECK: host '%s', chart '%s' collection %zu is in the same interpolation point: short by %llu microseconds", st->rrdhost->hostname, st->name, st->counter_done, next_store_ut - now_collect_ut);
}
#endif
@@ -1734,10 +1836,22 @@ after_second_database_work:
rrddim_flag_clear(rd, RRDDIM_FLAG_OBSOLETE);
/* only a collector can mark a chart as obsolete, so we must remove the reference */
- uint8_t can_delete_metric = rd->state->collect_ops.finalize(rd);
- if (can_delete_metric) {
+
+ size_t tiers_available = 0, tiers_said_yes = 0;
+ for(int tier = 0; tier < storage_tiers ;tier++) {
+ if(rd->tiers[tier]) {
+ tiers_available++;
+
+ if(rd->tiers[tier]->collect_ops.finalize(rd->tiers[tier]->db_collection_handle))
+ tiers_said_yes++;
+
+ rd->tiers[tier]->db_collection_handle = NULL;
+ }
+ }
+
+ if (tiers_available == tiers_said_yes && tiers_said_yes) {
/* This metric has no data and no references */
- delete_dimension_uuid(&rd->state->metric_uuid);
+ delete_dimension_uuid(&rd->metric_uuid);
} else {
/* Do not delete this dimension */
#ifdef ENABLE_ACLK