summaryrefslogtreecommitdiffstats
path: root/database/ram
diff options
context:
space:
mode:
authorStelios Fragkakis <52996999+stelfrag@users.noreply.github.com>2022-07-06 14:01:53 +0300
committerGitHub <noreply@github.com>2022-07-06 14:01:53 +0300
commit49234f23de3a32682daff07ca229b6b62f24c090 (patch)
treea81ed628abcf4457737bcc3597b097e8e430497a /database/ram
parent8d5850fd49bf6308cd6cab690cdbba4a35505b39 (diff)
Multi-Tier database backend for long term metrics storage (#13263)
* Tier part 1 * Tier part 2 * Tier part 3 * Tier part 4 * Tier part 5 * Fix some ML compilation errors * fix more conflicts * pass proper tier * move metric_uuid from state to RRDDIM * move aclk_live_status from state to RRDDIM * move ml_dimension from state to RRDDIM * abstracted the data collection interface * support flushing for mem db too * abstracted the query api * abstracted latest/oldest time per metric * cleanup * store_metric for tier1 * fix for store_metric * allow multiple tiers, more than 2 * state to tier * Change storage type in db. Query param to request min, max, sum or average * Store tier data correctly * Fix skipping tier page type * Add tier grouping in the tier * Fix to handle archived charts (part 1) * Temp fix for query granularity when requesting tier1 data * Fix parameters in the correct order and calculate the anomaly based on the anomaly count * Proper tiering grouping * Anomaly calculation based on anomaly count * force type checking on storage handles * update cmocka tests * fully dynamic number of storage tiers * fix static allocation * configure grouping for all tiers; disable tiers for unittest; disable statsd configuration for private charts mode * use default page dt using the tiering info * automatic selection of tier * fix for automatic selection of tier * working prototype of dynamic tier selection * automatic selection of tier done right (I hope) * ask for the proper tier value, based on the grouping function * fixes for unittests and load_metric_next() * fixes for lgtm findings * minor renames * add dbengine to page cache size setting * add dbengine to page cache with malloc * query engine optimized to loop as little are required based on the view_update_every * query engine grouping methods now do not assume a constant number of points per group and they allocate memory with OWA * report db points per tier in jsonwrap * query planer that switches database tiers on the fly to satisfy the query for the entire timeframe * dbegnine statistics and documentation (in progress) * calculate average point duration in db * handle single point pages the best we can * handle single point pages even better * Keep page type in the rrdeng_page_descr * updated doc * handle future backwards compatibility - improved statistics * support &tier=X in queries * enfore increasing iterations on tiers * tier 1 is always 1 iteration * backfilling higher tiers on first data collection * reversed anomaly bit * set up to 5 tiers * natural points should only be offered on tier 0, except a specific tier is selected * do not allow more than 65535 points of tier0 to be aggregated on any tier * Work only on actually activated tiers * fix query interpolation * fix query interpolation again * fix lgtm finding * Activate one tier for now * backfilling of higher tiers using raw metrics from lower tiers * fix for crash on start when storage tiers is increased from the default * more statistics on exit * fix bug that prevented higher tiers to get any values; added backfilling options * fixed the statistics log line * removed limit of 255 iterations per tier; moved the code of freezing rd->tiers[x]->db_metric_handle * fixed division by zero on zero points_wanted * removed dead code * Decide on the descr->type for the type of metric * dont store metrics on unknown page types * free db_metric_handle on sql based context queries * Disable STORAGE_POINT value check in the exporting engine unit tests * fix for db modes other than dbengine * fix for aclk archived chart queries destroying db_metric_handles of valid rrddims * fix left-over freez() instead of OWA freez on median queries Co-authored-by: Costa Tsaousis <costa@netdata.cloud> Co-authored-by: Vladimir Kobal <vlad@prokk.net>
Diffstat (limited to 'database/ram')
-rw-r--r--database/ram/rrddim_mem.c176
-rw-r--r--database/ram/rrddim_mem.h25
2 files changed, 167 insertions, 34 deletions
diff --git a/database/ram/rrddim_mem.c b/database/ram/rrddim_mem.c
index 606a45773c..4ab14877fa 100644
--- a/database/ram/rrddim_mem.c
+++ b/database/ram/rrddim_mem.c
@@ -5,34 +5,151 @@
// ----------------------------------------------------------------------------
// RRDDIM legacy data collection functions
-void rrddim_collect_init(RRDDIM *rd) {
+STORAGE_METRIC_HANDLE *rrddim_metric_init(RRDDIM *rd, STORAGE_INSTANCE *db_instance __maybe_unused) {
+ return (STORAGE_METRIC_HANDLE *)rd;
+}
+
+void rrddim_metric_free(STORAGE_METRIC_HANDLE *db_metric_handle __maybe_unused) {
+ ;
+}
+
+STORAGE_COLLECT_HANDLE *rrddim_collect_init(STORAGE_METRIC_HANDLE *db_metric_handle) {
+ RRDDIM *rd = (RRDDIM *)db_metric_handle;
rd->db[rd->rrdset->current_entry] = SN_EMPTY_SLOT;
- rd->state->handle = calloc(1, sizeof(struct mem_collect_handle));
+ struct mem_collect_handle *ch = calloc(1, sizeof(struct mem_collect_handle));
+ ch->rd = rd;
+ return (STORAGE_COLLECT_HANDLE *)ch;
}
-void rrddim_collect_store_metric(RRDDIM *rd, usec_t point_in_time, NETDATA_DOUBLE number, SN_FLAGS flags) {
- (void)point_in_time;
+
+void rrddim_collect_store_metric(STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time, NETDATA_DOUBLE number,
+ NETDATA_DOUBLE min_value,
+ NETDATA_DOUBLE max_value,
+ uint16_t count,
+ uint16_t anomaly_count,
+ SN_FLAGS flags)
+{
+ UNUSED(point_in_time);
+ UNUSED(min_value);
+ UNUSED(max_value);
+ UNUSED(count);
+ UNUSED(anomaly_count);
+
+ struct mem_collect_handle *ch = (struct mem_collect_handle *)collection_handle;
+ RRDDIM *rd = ch->rd;
rd->db[rd->rrdset->current_entry] = pack_storage_number(number, flags);
}
-int rrddim_collect_finalize(RRDDIM *rd) {
- free((struct mem_collect_handle*)rd->state->handle);
+
+void rrddim_store_metric_flush(STORAGE_COLLECT_HANDLE *collection_handle) {
+ struct mem_collect_handle *ch = (struct mem_collect_handle *)collection_handle;
+ RRDDIM *rd = ch->rd;
+ memset(rd->db, 0, rd->entries * sizeof(storage_number));
+}
+
+int rrddim_collect_finalize(STORAGE_COLLECT_HANDLE *collection_handle) {
+ free(collection_handle);
return 0;
}
// ----------------------------------------------------------------------------
+
+// get the total duration in seconds of the round robin database
+#define rrddim_duration(st) (( (time_t)(rd)->rrdset->counter >= (time_t)(rd)->rrdset->entries ? (time_t)(rd)->rrdset->entries : (time_t)(rd)->rrdset->counter ) * (time_t)(rd)->rrdset->update_every)
+
+// get the last slot updated in the round robin database
+#define rrddim_last_slot(rd) ((size_t)(((rd)->rrdset->current_entry == 0) ? (rd)->rrdset->entries - 1 : (rd)->rrdset->current_entry - 1))
+
+// return the slot that has the oldest value
+#define rrddim_first_slot(rd) ((size_t)((rd)->rrdset->counter >= (size_t)(rd)->rrdset->entries ? (rd)->rrdset->current_entry : 0))
+
+// get the slot of the round robin database, for the given timestamp (t)
+// it always returns a valid slot, although may not be for the time requested if the time is outside the round robin database
+// only valid when not using dbengine
+static inline size_t rrddim_time2slot(RRDDIM *rd, time_t t) {
+ size_t ret = 0;
+ time_t last_entry_t = rrddim_query_latest_time((STORAGE_METRIC_HANDLE *)rd);
+ time_t first_entry_t = rrddim_query_oldest_time((STORAGE_METRIC_HANDLE *)rd);
+ size_t entries = rd->rrdset->entries;
+ size_t first_slot = rrddim_first_slot(rd);
+ size_t last_slot = rrddim_last_slot(rd);
+ size_t update_every = rd->rrdset->update_every;
+
+ if(t >= last_entry_t) {
+ // the requested time is after the last entry we have
+ ret = last_slot;
+ }
+ else {
+ if(t <= first_entry_t) {
+ // the requested time is before the first entry we have
+ ret = first_slot;
+ }
+ else {
+ if(last_slot >= (size_t)((last_entry_t - t) / update_every))
+ ret = last_slot - ((last_entry_t - t) / update_every);
+ else
+ ret = last_slot - ((last_entry_t - t) / update_every) + entries;
+ }
+ }
+
+ if(unlikely(ret >= entries)) {
+ error("INTERNAL ERROR: rrddim_time2slot() on %s returns values outside entries", rd->name);
+ ret = entries - 1;
+ }
+
+ return ret;
+}
+
+// get the timestamp of a specific slot in the round robin database
+// only valid when not using dbengine
+static inline time_t rrddim_slot2time(RRDDIM *rd, size_t slot) {
+ time_t ret;
+ time_t last_entry_t = rrddim_query_latest_time((STORAGE_METRIC_HANDLE *)rd);
+ time_t first_entry_t = rrddim_query_oldest_time((STORAGE_METRIC_HANDLE *)rd);
+ size_t entries = rd->rrdset->entries;
+ size_t last_slot = rrddim_last_slot(rd);
+ size_t update_every = rd->rrdset->update_every;
+
+ if(slot >= entries) {
+ error("INTERNAL ERROR: caller of rrddim_slot2time() gives invalid slot %zu", slot);
+ slot = entries - 1;
+ }
+
+ if(slot > last_slot)
+ ret = last_entry_t - (time_t)(update_every * (last_slot - slot + entries));
+ else
+ ret = last_entry_t - (time_t)(update_every * (last_slot - slot));
+
+ if(unlikely(ret < first_entry_t)) {
+ error("INTERNAL ERROR: rrddim_slot2time() on %s returns time too far in the past", rd->name);
+ ret = first_entry_t;
+ }
+
+ if(unlikely(ret > last_entry_t)) {
+ error("INTERNAL ERROR: rrddim_slot2time() on %s returns time into the future", rd->name);
+ ret = last_entry_t;
+ }
+
+ return ret;
+}
+
+// ----------------------------------------------------------------------------
// RRDDIM legacy database query functions
-void rrddim_query_init(RRDDIM *rd, struct rrddim_query_handle *handle, time_t start_time, time_t end_time) {
+void rrddim_query_init(STORAGE_METRIC_HANDLE *db_metric_handle, struct rrddim_query_handle *handle, time_t start_time, time_t end_time, TIER_QUERY_FETCH tier_query_fetch_type) {
+ UNUSED(tier_query_fetch_type);
+
+ RRDDIM *rd = (RRDDIM *)db_metric_handle;
+
handle->rd = rd;
handle->start_time = start_time;
handle->end_time = end_time;
struct mem_query_handle* h = calloc(1, sizeof(struct mem_query_handle));
- h->slot = rrdset_time2slot(rd->rrdset, start_time);
- h->last_slot = rrdset_time2slot(rd->rrdset, end_time);
- h->dt = rd->update_every;
+ h->slot = rrddim_time2slot(rd, start_time);
+ h->last_slot = rrddim_time2slot(rd, end_time);
+ h->dt = rd->rrdset->update_every;
h->next_timestamp = start_time;
- h->slot_timestamp = rrdset_slot2time(rd->rrdset, h->slot);
- h->last_timestamp = rrdset_slot2time(rd->rrdset, h->last_slot);
+ h->slot_timestamp = rrddim_slot2time(rd, h->slot);
+ h->last_timestamp = rrddim_slot2time(rd, h->last_slot);
// info("RRDDIM QUERY INIT: start %ld, end %ld, next %ld, first %ld, last %ld, dt %ld", start_time, end_time, h->next_timestamp, h->slot_timestamp, h->last_timestamp, h->dt);
@@ -42,28 +159,30 @@ void rrddim_query_init(RRDDIM *rd, struct rrddim_query_handle *handle, time_t st
// Returns the metric and sets its timestamp into current_time
// IT IS REQUIRED TO **ALWAYS** SET ALL RETURN VALUES (current_time, end_time, flags)
// IT IS REQUIRED TO **ALWAYS** KEEP TRACK OF TIME, EVEN OUTSIDE THE DATABASE BOUNDARIES
-NETDATA_DOUBLE
-rrddim_query_next_metric(struct rrddim_query_handle *handle, time_t *start_time, time_t *end_time, SN_FLAGS *flags) {
+STORAGE_POINT rrddim_query_next_metric(struct rrddim_query_handle *handle) {
RRDDIM *rd = handle->rd;
struct mem_query_handle* h = (struct mem_query_handle*)handle->handle;
size_t entries = rd->rrdset->entries;
size_t slot = h->slot;
+ STORAGE_POINT sp;
+ sp.count = 1;
+
time_t this_timestamp = h->next_timestamp;
h->next_timestamp += h->dt;
// set this timestamp for our caller
- *start_time = this_timestamp - h->dt;
- *end_time = this_timestamp;
+ sp.start_time = this_timestamp - h->dt;
+ sp.end_time = this_timestamp;
if(unlikely(this_timestamp < h->slot_timestamp)) {
- *flags = SN_EMPTY_SLOT;
- return NAN;
+ storage_point_empty(sp, sp.start_time, sp.end_time);
+ return sp;
}
if(unlikely(this_timestamp > h->last_timestamp)) {
- *flags = SN_EMPTY_SLOT;
- return NAN;
+ storage_point_empty(sp, sp.start_time, sp.end_time);
+ return sp;
}
storage_number n = rd->db[slot++];
@@ -72,8 +191,11 @@ rrddim_query_next_metric(struct rrddim_query_handle *handle, time_t *start_time,
h->slot = slot;
h->slot_timestamp += h->dt;
- *flags = (n & SN_ALL_FLAGS);
- return unpack_storage_number(n);
+ sp.anomaly_count = (n & SN_ANOMALY_BIT) ? 0 : 1;
+ sp.flags = (n & SN_ALL_FLAGS);
+ sp.min = sp.max = sp.sum = unpack_storage_number(n);
+
+ return sp;
}
int rrddim_query_is_finished(struct rrddim_query_handle *handle) {
@@ -89,10 +211,12 @@ void rrddim_query_finalize(struct rrddim_query_handle *handle) {
freez(handle->handle);
}
-time_t rrddim_query_latest_time(RRDDIM *rd) {
- return rrdset_last_entry_t_nolock(rd->rrdset);
+time_t rrddim_query_latest_time(STORAGE_METRIC_HANDLE *db_metric_handle) {
+ RRDDIM *rd = (RRDDIM *)db_metric_handle;
+ return rd->rrdset->last_updated.tv_sec;
}
-time_t rrddim_query_oldest_time(RRDDIM *rd) {
- return rrdset_first_entry_t_nolock(rd->rrdset);
+time_t rrddim_query_oldest_time(STORAGE_METRIC_HANDLE *db_metric_handle) {
+ RRDDIM *rd = (RRDDIM *)db_metric_handle;
+ return (time_t)(rd->rrdset->last_updated.tv_sec - rrddim_duration(rd));
}
diff --git a/database/ram/rrddim_mem.h b/database/ram/rrddim_mem.h
index fac8194b79..400bdd0c28 100644
--- a/database/ram/rrddim_mem.h
+++ b/database/ram/rrddim_mem.h
@@ -6,6 +6,7 @@
#include "database/rrd.h"
struct mem_collect_handle {
+ RRDDIM *rd;
long slot;
long entries;
};
@@ -19,16 +20,24 @@ struct mem_query_handle {
size_t last_slot;
};
-extern void rrddim_collect_init(RRDDIM *rd);
-extern void rrddim_collect_store_metric(RRDDIM *rd, usec_t point_in_time, NETDATA_DOUBLE number, SN_FLAGS flags);
-extern int rrddim_collect_finalize(RRDDIM *rd);
+extern STORAGE_METRIC_HANDLE *rrddim_metric_init(RRDDIM *rd, STORAGE_INSTANCE *db_instance);
+extern void rrddim_metric_free(STORAGE_METRIC_HANDLE *db_metric_handle);
-extern void rrddim_query_init(RRDDIM *rd, struct rrddim_query_handle *handle, time_t start_time, time_t end_time);
-extern NETDATA_DOUBLE
-rrddim_query_next_metric(struct rrddim_query_handle *handle, time_t *start_time, time_t *end_time, SN_FLAGS *flags);
+extern STORAGE_COLLECT_HANDLE *rrddim_collect_init(STORAGE_METRIC_HANDLE *db_metric_handle);
+extern void rrddim_collect_store_metric(STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time, NETDATA_DOUBLE number,
+ NETDATA_DOUBLE min_value,
+ NETDATA_DOUBLE max_value,
+ uint16_t count,
+ uint16_t anomaly_count,
+ SN_FLAGS flags);
+extern void rrddim_store_metric_flush(STORAGE_COLLECT_HANDLE *collection_handle);
+extern int rrddim_collect_finalize(STORAGE_COLLECT_HANDLE *collection_handle);
+
+extern void rrddim_query_init(STORAGE_METRIC_HANDLE *db_metric_handle, struct rrddim_query_handle *handle, time_t start_time, time_t end_time, TIER_QUERY_FETCH tier_query_fetch_type);
+extern STORAGE_POINT rrddim_query_next_metric(struct rrddim_query_handle *handle);
extern int rrddim_query_is_finished(struct rrddim_query_handle *handle);
extern void rrddim_query_finalize(struct rrddim_query_handle *handle);
-extern time_t rrddim_query_latest_time(RRDDIM *rd);
-extern time_t rrddim_query_oldest_time(RRDDIM *rd);
+extern time_t rrddim_query_latest_time(STORAGE_METRIC_HANDLE *db_metric_handle);
+extern time_t rrddim_query_oldest_time(STORAGE_METRIC_HANDLE *db_metric_handle);
#endif