summaryrefslogtreecommitdiffstats
path: root/database
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-06-22 11:19:08 +0300
committerGitHub <noreply@github.com>2022-06-22 11:19:08 +0300
commitb32ca44319e35eb38e5858730f2ea44ea2268926 (patch)
tree40814c014d919657f6c96d55c0947631a1fb86ee /database
parent7b6e23e98c7c5624a76f8dd3435b41594fb5f39f (diff)
Query Engine multi-granularity support (and MC improvements) (#13155)
* set grouping functions * storage engine should check the validity of timestamps, not the query engine * calculate and store in RRDR anomaly rates for every query * anomaly rate used by volume metric correlations * mc volume should use absolute data, to avoid cancelling effect * return anomaly-rates in jasonwrap with jw-anomaly-rates option to data queries * dont return null on anomaly rates * allow passing group query options from the URL * added countif to the query engine and used it in metric correlations * fix configure * fix countif and anomaly rate percentages * added group_options to metric correlations; updated swagger * added newline at the end of yaml file * always check the time the highlighted window was above/below the highlighted window * properly track time in memory queries * error for internal checks only * moved pack_storage_number() into the storage engines * moved unpack_storage_number() inside the storage engines * remove old comment * pass unit tests * properly detect zero or subnormal values in pack_storage_number() * fill nulls before the value, not after * make sure math.h is included * workaround for isfinite() * fix for isfinite() * faster isfinite() alternative * fix for faster isfinite() alternative * next_metric() now returns end_time too * variable step implemented in a generic way * remove left-over variables * ensure we always complete the wanted number of points * fixes * ensure no infinite loop * mc-volume-improvements: Add information about invalid condition * points should have a duration in the past * removed unneeded info() line * Fix unit tests for exporting engine * new_point should only be checked when it is fetched from the db; better comment about the premature breaking of the main query loop Co-authored-by: Thiago Marques <thiagoftsm@gmail.com> Co-authored-by: Vladimir Kobal <vlad@prokk.net>
Diffstat (limited to 'database')
-rwxr-xr-xdatabase/engine/rrdengineapi.c48
-rw-r--r--database/engine/rrdengineapi.h4
-rw-r--r--database/metric_correlations.c129
-rw-r--r--database/metric_correlations.h3
-rw-r--r--database/ram/rrddim_mem.c54
-rw-r--r--database/ram/rrddim_mem.h14
-rw-r--r--database/rrd.h20
-rw-r--r--database/rrdset.c51
8 files changed, 196 insertions, 127 deletions
diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c
index ceead28c41..0792ebe001 100755
--- a/database/engine/rrdengineapi.c
+++ b/database/engine/rrdengineapi.c
@@ -193,8 +193,10 @@ void rrdeng_store_metric_flush_current_page(RRDDIM *rd)
handle->descr = NULL;
}
-void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number number)
+void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, calculated_number n, SN_FLAGS flags)
{
+ storage_number number = pack_storage_number(n, flags);
+
struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)rd->state->handle;
struct rrdengine_instance *ctx;
struct page_cache *pg_cache;
@@ -509,6 +511,8 @@ unsigned rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t e
*/
void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_handle, time_t start_time, time_t end_time)
{
+ // fprintf(stderr, "%s: %s/%s start time %ld, end time %ld\n", __FUNCTION__ , rd->rrdset->name, rd->name, start_time, end_time);
+
struct rrdeng_query_handle *handle;
struct rrdengine_instance *ctx;
unsigned pages_nr;
@@ -520,6 +524,8 @@ void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_hand
handle = callocz(1, sizeof(struct rrdeng_query_handle));
handle->next_page_time = start_time;
handle->now = start_time;
+ handle->dt = rd->update_every * USEC_PER_SEC;
+ handle->dt_sec = rd->update_every;
handle->position = 0;
handle->ctx = ctx;
handle->descr = NULL;
@@ -527,7 +533,7 @@ void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_hand
pages_nr = pg_cache_preload(ctx, rd->state->rrdeng_uuid, start_time * USEC_PER_SEC, end_time * USEC_PER_SEC,
NULL, &handle->page_index);
if (unlikely(NULL == handle->page_index || 0 == pages_nr))
- /* there are no metrics to load */
+ // there are no metrics to load
handle->next_page_time = INVALID_TIME;
}
@@ -585,40 +591,50 @@ static int rrdeng_load_page_next(struct rrddim_query_handle *rrdimm_handle) {
usec_t entries = handle->entries = page_length / sizeof(storage_number);
if (likely(entries > 1))
handle->dt = (page_end_time - descr->start_time) / (entries - 1);
- else
- handle->dt = 0;
- handle->dt_sec = handle->dt / USEC_PER_SEC;
+ handle->dt_sec = (time_t)(handle->dt / USEC_PER_SEC);
handle->position = position;
return 0;
}
-/* Returns the metric and sets its timestamp into current_time */
-storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle, time_t *current_time) {
+// Returns the metric and sets its timestamp into current_time
+// IT IS REQUIRED TO **ALWAYS** SET ALL RETURN VALUES (current_time, end_time, flags)
+// IT IS REQUIRED TO **ALWAYS** KEEP TRACK OF TIME, EVEN OUTSIDE THE DATABASE BOUNDARIES
+calculated_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle, time_t *start_time, time_t *end_time, SN_FLAGS *flags) {
struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle;
- if (unlikely(INVALID_TIME == handle->next_page_time))
- return SN_EMPTY_SLOT;
-
struct rrdeng_page_descr *descr = handle->descr;
unsigned position = handle->position + 1;
time_t now = handle->now + handle->dt_sec;
+ if (unlikely(INVALID_TIME == handle->next_page_time)) {
+ handle->next_page_time = INVALID_TIME;
+ handle->now = now;
+ *start_time = now - handle->dt_sec;
+ *end_time = now;
+ *flags = SN_EMPTY_SLOT;
+ return NAN;
+ }
+
if (unlikely(!descr || position >= handle->entries)) {
// We need to get a new page
if(rrdeng_load_page_next(rrdimm_handle)) {
// next calls will not load any more metrics
handle->next_page_time = INVALID_TIME;
- return SN_EMPTY_SLOT;
+ handle->now = now;
+ *start_time = now - handle->dt_sec;
+ *end_time = now;
+ *flags = SN_EMPTY_SLOT;
+ return NAN;
}
descr = handle->descr;
position = handle->position;
- now = (descr->start_time + position * handle->dt) / USEC_PER_SEC;
+ now = (time_t)((descr->start_time + position * handle->dt) / USEC_PER_SEC);
}
- storage_number ret = handle->page[position];
+ storage_number n = handle->page[position];
handle->position = position;
handle->now = now;
@@ -627,8 +643,10 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle
handle->next_page_time = INVALID_TIME;
}
- *current_time = now;
- return ret;
+ *flags = n & SN_ALL_FLAGS;
+ *start_time = now - handle->dt_sec;
+ *end_time = now;
+ return unpack_storage_number(n);
}
int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle)
diff --git a/database/engine/rrdengineapi.h b/database/engine/rrdengineapi.h
index 67443c1539..798545d9d7 100644
--- a/database/engine/rrdengineapi.h
+++ b/database/engine/rrdengineapi.h
@@ -40,14 +40,14 @@ extern void rrdeng_convert_legacy_uuid_to_multihost(char machine_guid[GUID_LEN +
extern void rrdeng_metric_init(RRDDIM *rd);
extern void rrdeng_store_metric_init(RRDDIM *rd);
extern void rrdeng_store_metric_flush_current_page(RRDDIM *rd);
-extern void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number number);
+extern void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, calculated_number number, SN_FLAGS flags);
extern int rrdeng_store_metric_finalize(RRDDIM *rd);
extern unsigned
rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t end_time,
struct rrdeng_region_info **region_info_arrayp, unsigned *max_intervalp, struct context_param *context_param_list);
extern void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_handle,
time_t start_time, time_t end_time);
-extern storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle, time_t *current_time);
+extern calculated_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle, time_t *start_time, time_t *end_time, SN_FLAGS *flags);
extern int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle);
extern void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle);
extern time_t rrdeng_metric_latest_time(RRDDIM *rd);
diff --git a/database/metric_correlations.c b/database/metric_correlations.c
index 925768f5da..148f2acbce 100644
--- a/database/metric_correlations.c
+++ b/database/metric_correlations.c
@@ -9,6 +9,7 @@ int metric_correlations_version = 1;
METRIC_CORRELATIONS_METHOD default_metric_correlations_method = METRIC_CORRELATIONS_KS2;
typedef struct mc_stats {
+ calculated_number max_base_high_ratio;
size_t db_points;
size_t result_points;
size_t db_queries;
@@ -46,7 +47,13 @@ const char *mc_method_to_string(METRIC_CORRELATIONS_METHOD method) {
// ----------------------------------------------------------------------------
// The results per dimension are aggregated into a dictionary
+typedef enum {
+ RESULT_IS_BASE_HIGH_RATIO = (1 << 0),
+ RESULT_IS_PERCENTAGE_OF_TIME = (1 << 1),
+} RESULT_FLAGS;
+
struct register_result {
+ RESULT_FLAGS flags;
RRDSET *st;
const char *chart_id;
const char *context;
@@ -86,15 +93,26 @@ static void register_result_destroy(DICTIONARY *results) {
dictionary_destroy(results);
}
-static void register_result(DICTIONARY *results, RRDSET *st, RRDDIM *d, calculated_number value) {
+static void register_result(DICTIONARY *results, RRDSET *st, RRDDIM *d, calculated_number value, RESULT_FLAGS flags, MC_STATS *stats) {
if(!calculated_number_isnumber(value)) return;
+ // make it positive
+ calculated_number v = calculated_number_fabs(value);
+
+ // no need to store zero scored values
+ if(v == 0.0) return;
+
+ // keep track of the max of the baseline / highlight ratio
+ if(flags & RESULT_IS_BASE_HIGH_RATIO && v > stats->max_base_high_ratio)
+ stats->max_base_high_ratio = v;
+
struct register_result t = {
+ .flags = flags,
.st = st,
.chart_id = st->id,
.context = st->context,
.dim_name = d->name,
- .value = value
+ .value = v
};
char buf[5000 + 1];
@@ -361,7 +379,8 @@ static double kstwo(calculated_number baseline[], int baseline_points, calculate
static int rrdset_metric_correlations_ks2(RRDSET *st, DICTIONARY *results,
long long baseline_after, long long baseline_before,
long long after, long long before,
- long long points, RRDR_OPTIONS options, RRDR_GROUPING group,
+ long long points, RRDR_OPTIONS options,
+ RRDR_GROUPING group, const char *group_options,
uint32_t shifts, int timeout, MC_STATS *stats) {
long group_time = 0;
struct context_param *context_param_list = NULL;
@@ -377,7 +396,8 @@ static int rrdset_metric_correlations_ks2(RRDSET *st, DICTIONARY *results,
ONEWAYALLOC *owa = onewayalloc_create(0);
high_rrdr = rrd2rrdr(owa, st, points,
after, before, group,
- group_time, options, NULL, context_param_list, timeout);
+ group_time, options, NULL, context_param_list, group_options,
+ timeout);
if(!high_rrdr) {
info("Metric correlations: rrd2rrdr() failed for the highlighted window on chart '%s'.", st->name);
goto cleanup;
@@ -402,7 +422,7 @@ static int rrdset_metric_correlations_ks2(RRDSET *st, DICTIONARY *results,
stats->db_queries++;
base_rrdr = rrd2rrdr(owa, st,high_points << shifts,
baseline_after, baseline_before, group,
- group_time, options, NULL, context_param_list,
+ group_time, options, NULL, context_param_list, group_options,
(int)(timeout - ((now_usec - started_usec) / USEC_PER_MS)));
if(!base_rrdr) {
info("Metric correlations: rrd2rrdr() failed for the baseline window on chart '%s'.", st->name);
@@ -477,7 +497,7 @@ static int rrdset_metric_correlations_ks2(RRDSET *st, DICTIONARY *results,
// to spread the results evenly, 0.0 needs to be the less correlated and 1.0 the most correlated
// so we flip the result of kstwo()
- register_result(results, base_rrdr->st, d, 1.0 - prob);
+ register_result(results, base_rrdr->st, d, 1.0 - prob, RESULT_IS_BASE_HIGH_RATIO, stats);
}
}
@@ -494,8 +514,9 @@ cleanup:
static int rrdset_metric_correlations_volume(RRDSET *st, DICTIONARY *results,
long long baseline_after, long long baseline_before,
long long after, long long before,
- RRDR_OPTIONS options, RRDR_GROUPING group, int timeout, MC_STATS *stats) {
- options |= RRDR_OPTION_MATCH_IDS;
+ RRDR_OPTIONS options, RRDR_GROUPING group, const char *group_options,
+ int timeout, MC_STATS *stats) {
+ options |= RRDR_OPTION_MATCH_IDS | RRDR_OPTION_ABSOLUTE;
long group_time = 0;
int correlated_dimensions = 0;
@@ -516,46 +537,79 @@ static int rrdset_metric_correlations_volume(RRDSET *st, DICTIONARY *results,
// dimensions, and we query a single dimension at a time.
stats->db_queries++;
+ calculated_number baseline_average = NAN;
+ uint8_t base_anomaly_rate = 0;
+ value_is_null = 1;
+ ret = rrdset2value_api_v1(st, NULL, &baseline_average, d->id, 1,
+ baseline_after, baseline_before,
+ group, group_options, group_time, options,
+ NULL, NULL,
+ &stats->db_points, &stats->result_points,
+ &value_is_null, &base_anomaly_rate, 0);
+
+ if(ret != HTTP_RESP_OK || value_is_null || !calculated_number_isnumber(baseline_average)) {
+ // this means no data for the baseline window, but we may have data for the highlighted one - assume zero
+ baseline_average = 0.0;
+ }
+
+ stats->db_queries++;
calculated_number highlight_average = NAN;
+ uint8_t high_anomaly_rate = 0;
value_is_null = 1;
ret = rrdset2value_api_v1(st, NULL, &highlight_average, d->id, 1,
after, before,
- group, group_time, options,
+ group, group_options, group_time, options,
NULL, NULL,
&stats->db_points, &stats->result_points,
- &value_is_null, 0);
+ &value_is_null, &high_anomaly_rate, 0);
if(ret != HTTP_RESP_OK || value_is_null || !calculated_number_isnumber(highlight_average)) {
- // error("Metric correlations: cannot query highlight duration of dimension '%s' of chart '%s', %d %s %s %s", st->name, d->name, ret, (ret != HTTP_RESP_OK)?"response failed":"", (value_is_null)?"value is null":"", (!calculated_number_isnumber(highlight_average))?"result is NAN":"");
// this means no data for the highlighted duration - so skip it
continue;
}
+ if(baseline_average == highlight_average) {
+ // they are the same - let's move on
+ continue;
+ }
+
stats->db_queries++;
- calculated_number baseline_average = NAN;
+ calculated_number highlight_countif = NAN;
value_is_null = 1;
- ret = rrdset2value_api_v1(st, NULL, &baseline_average, d->id, 1,
- baseline_after, baseline_before,
- group, group_time, options,
+
+ char highlighted_countif_options[50 + 1];
+ snprintfz(highlighted_countif_options, 50, "%s" CALCULATED_NUMBER_FORMAT, highlight_average < baseline_average ? "<":">", baseline_average);
+
+ ret = rrdset2value_api_v1(st, NULL, &highlight_countif, d->id, 1,
+ after, before,
+ RRDR_GROUPING_COUNTIF,highlighted_countif_options,
+ group_time, options,
NULL, NULL,
&stats->db_points, &stats->result_points,
- &value_is_null, 0);
+ &value_is_null, NULL, 0);
- if(ret != HTTP_RESP_OK || value_is_null || !calculated_number_isnumber(baseline_average)) {
- // error("Metric correlations: cannot query baseline duration of dimension '%s' of chart '%s', %d %s %s %s", st->name, d->name, ret, (ret != HTTP_RESP_OK)?"response failed":"", (value_is_null)?"value is null":"", (!calculated_number_isnumber(baseline_average))?"result is NAN":"");
- // continue;
- // this means no data for the baseline window, but we have data for the highlighted one - assume zero
- baseline_average = 0.0;
+ if(ret != HTTP_RESP_OK || value_is_null || !calculated_number_isnumber(highlight_countif)) {
+ info("MC: highlighted countif query failed, but highlighted average worked - strange...");
+ continue;
}
- calculated_number pcent = NAN;
- if(isgreater(baseline_average, 0.0) || isless(baseline_average, 0.0))
- pcent = (highlight_average - baseline_average) / baseline_average;
+ // this represents the percentage of time
+ // the highlighted window was above/below the baseline window
+ // (above or below depending on their averages)
+ highlight_countif = highlight_countif / 100.0; // countif returns 0 - 100.0
- else if(isgreater(highlight_average, 0.0) || isless(highlight_average, 0.0))
- pcent = highlight_average;
+ RESULT_FLAGS flags;
+ calculated_number pcent = NAN;
+ if(isgreater(baseline_average, 0.0) || isless(baseline_average, 0.0)) {
+ flags = RESULT_IS_BASE_HIGH_RATIO;
+ pcent = (highlight_average - baseline_average) / baseline_average * highlight_countif;
+ }
+ else {
+ flags = RESULT_IS_PERCENTAGE_OF_TIME;
+ pcent = highlight_countif;
+ }
- register_result(results, st, d, pcent);
+ register_result(results, st, d, pcent, flags, stats);
}
return correlated_dimensions;
@@ -590,18 +644,23 @@ static inline int binary_search_bigger_than_calculated_number(const calculated_n
// ----------------------------------------------------------------------------
// spread the results evenly according to their value
-static size_t spread_results_evenly(DICTIONARY *results) {
+static size_t spread_results_evenly(DICTIONARY *results, MC_STATS *stats) {
struct register_result *t;
// count the dimensions
size_t dimensions = dictionary_stats_entries(results);
if(!dimensions) return 0;
+ if(stats->max_base_high_ratio == 0.0)
+ stats->max_base_high_ratio = 1.0;
+
// create an array of the right size and copy all the values in it
calculated_number slots[dimensions];
dimensions = 0;
dfe_start_read(results, t) {
- t->value = calculated_number_fabs(t->value);
+ if(t->flags & (RESULT_IS_PERCENTAGE_OF_TIME))
+ t->value = t->value * stats->max_base_high_ratio;
+
slots[dimensions++] = t->value;
}
dfe_done(t);
@@ -639,7 +698,8 @@ static size_t spread_results_evenly(DICTIONARY *results) {
// ----------------------------------------------------------------------------
// The main function
-int metric_correlations(RRDHOST *host, BUFFER *wb, METRIC_CORRELATIONS_METHOD method, RRDR_GROUPING group,
+int metric_correlations(RRDHOST *host, BUFFER *wb, METRIC_CORRELATIONS_METHOD method,
+ RRDR_GROUPING group, const char *group_options,
long long baseline_after, long long baseline_before,
long long after, long long before,
long long points, RRDR_OPTIONS options, int timeout) {
@@ -727,8 +787,7 @@ int metric_correlations(RRDHOST *host, BUFFER *wb, METRIC_CORRELATIONS_METHOD me
while((points << shifts) > MAX_POINTS)
points = points >> 1;
- if(points < 100) {
- // error = "cannot comply to at least 100 points";
+ if(points < 15) {
resp = HTTP_RESP_BAD_REQUEST;
goto cleanup;
}
@@ -769,7 +828,7 @@ int metric_correlations(RRDHOST *host, BUFFER *wb, METRIC_CORRELATIONS_METHOD me
correlated_dimensions += rrdset_metric_correlations_volume(st, results,
baseline_after, baseline_before,
after, before,
- options, group,
+ options, group, group_options,
(int)(timeout - ((now_usec - started_usec) / USEC_PER_MS)),
&stats);
break;
@@ -779,7 +838,7 @@ int metric_correlations(RRDHOST *host, BUFFER *wb, METRIC_CORRELATIONS_METHOD me
correlated_dimensions += rrdset_metric_correlations_ks2(st, results,
baseline_after, baseline_before,
after, before,
- points, options, group, shifts,
+ points, options, group, group_options, shifts,
(int)(timeout - ((now_usec - started_usec) / USEC_PER_MS)),
&stats);
break;
@@ -790,7 +849,7 @@ int metric_correlations(RRDHOST *host, BUFFER *wb, METRIC_CORRELATIONS_METHOD me
dfe_done(ptr);
if(!(options & RRDR_OPTION_RETURN_RAW))
- spread_results_evenly(results);
+ spread_results_evenly(results, &stats);
usec_t ended_usec = now_realtime_usec();
diff --git a/database/metric_correlations.h b/database/metric_correlations.h
index aa8944d922..4d0b4dba3b 100644
--- a/database/metric_correlations.h
+++ b/database/metric_correlations.h
@@ -14,7 +14,8 @@ extern int enable_metric_correlations;
extern int metric_correlations_version;
extern METRIC_CORRELATIONS_METHOD default_metric_correlations_method;
-extern int metric_correlations (RRDHOST *host, BUFFER *wb, METRIC_CORRELATIONS_METHOD method, RRDR_GROUPING group,
+extern int metric_correlations (RRDHOST *host, BUFFER *wb, METRIC_CORRELATIONS_METHOD method,
+ RRDR_GROUPING group, const char *group_options,
long long baseline_after, long long baseline_before,
long long after, long long before,
long long points, RRDR_OPTIONS options, int timeout);
diff --git a/database/ram/rrddim_mem.c b/database/ram/rrddim_mem.c
index b17f03ca50..03b6983a29 100644
--- a/database/ram/rrddim_mem.c
+++ b/database/ram/rrddim_mem.c
@@ -9,9 +9,9 @@ void rrddim_collect_init(RRDDIM *rd) {
rd->values[rd->rrdset->current_entry] = SN_EMPTY_SLOT;
rd->state->handle = calloc(1, sizeof(struct mem_collect_handle));
}
-void rrddim_collect_store_metric(RRDDIM *rd, usec_t point_in_time, storage_number number) {
+void rrddim_collect_store_metric(RRDDIM *rd, usec_t point_in_time, calculated_number number, SN_FLAGS flags) {
(void)point_in_time;
- rd->values[rd->rrdset->current_entry] = number;
+ rd->values[rd->rrdset->current_entry] = pack_storage_number(number, flags);
}
int rrddim_collect_finalize(RRDDIM *rd) {
free((struct mem_collect_handle*)rd->state->handle);
@@ -28,33 +28,63 @@ void rrddim_query_init(RRDDIM *rd, struct rrddim_query_handle *handle, time_t st
struct mem_query_handle* h = calloc(1, sizeof(struct mem_query_handle));
h->slot = rrdset_time2slot(rd->rrdset, start_time);
h->last_slot = rrdset_time2slot(rd->rrdset, end_time);
- h->finished = 0;
+ h->dt = rd->update_every;
+
+ h->next_timestamp = start_time;
+ h->slot_timestamp = rrdset_slot2time(rd->rrdset, h->slot);
+ h->last_timestamp = rrdset_slot2time(rd->rrdset, h->last_slot);
+
+ // info("QUERY: start %ld, end %ld, next %ld, first %ld, last %ld", start_time, end_time, h->next_timestamp, h->slot_timestamp, h->last_timestamp);
+
handle->handle = (STORAGE_QUERY_HANDLE *)h;
}
-storage_number rrddim_query_next_metric(struct rrddim_query_handle *handle, time_t *current_time) {
+// Returns the metric and sets its timestamp into current_time
+// IT IS REQUIRED TO **ALWAYS** SET ALL RETURN VALUES (current_time, end_time, flags)
+// IT IS REQUIRED TO **ALWAYS** KEEP TRACK OF TIME, EVEN OUTSIDE THE DATABASE BOUNDARIES
+calculated_number rrddim_query_next_metric(struct rrddim_query_handle *handle, time_t *start_time, time_t *end_time, SN_FLAGS *flags) {
RRDDIM *rd = handle->rd;
struct mem_query_handle* h = (struct mem_query_handle*)handle->handle;
- long entries = rd->rrdset->entries;
- long slot = h->slot;
+ size_t entries = rd->rrdset->entries;
+ size_t slot = h->slot;
- (void)current_time;
- if (unlikely(h->slot == h->last_slot))
- h->finished = 1;
- storage_number n = rd->values[slot++];
+ time_t this_timestamp = h->next_timestamp;
+ h->next_timestamp += h->dt;
+
+ // set this timestamp for our caller
+ *start_time = this_timestamp - h->dt;
+ *end_time = this_timestamp;
+ if(unlikely(this_timestamp < h->slot_timestamp)) {
+ *flags = SN_EMPTY_SLOT;
+ return NAN;
+ }
+
+ if(unlikely(this_timestamp > h->last_timestamp)) {
+ *flags = SN_EMPTY_SLOT;
+ return NAN;
+ }
+
+ storage_number n = rd->values[slot++];
if(unlikely(slot >= entries)) slot = 0;
+
h->slot = slot;
+ h->slot_timestamp += h->dt;
- return n;
+ *flags = (n & SN_ALL_FLAGS);
+ return unpack_storage_number(n);
}
int rrddim_query_is_finished(struct rrddim_query_handle *handle) {
struct mem_query_handle* h = (struct mem_query_handle*)handle->handle;
- return h->finished;
+ return (h->next_timestamp > handle->end_time);
}
void rrddim_query_finalize(struct rrddim_query_handle *handle) {
+#ifdef NETDATA_INTERNAL_CHECKS
+ if(!rrddim_query_is_finished(handle))
+ error("QUERY: query for chart '%s' dimension '%s' has been stopped unfinished", handle->rd->rrdset->id, handle->rd->name);
+#endif
freez(handle->handle);
}
diff --git a/database/ram/rrddim_mem.h b/database/ram/rrddim_mem.h
index 9a215387ae..ede8a4e211 100644
--- a/database/ram/rrddim_mem.h
+++ b/database/ram/rrddim_mem.h
@@ -9,18 +9,22 @@ struct mem_collect_handle {
long slot;
long entries;
};
+
struct mem_query_handle {
- long slot;
- long last_slot;
- uint8_t finished;
+ time_t dt;
+ time_t next_timestamp;
+ time_t last_timestamp;
+ time_t slot_timestamp;
+ size_t slot;
+ size_t last_slot;
};
extern void rrddim_collect_init(RRDDIM *rd);
-extern void rrddim_collect_store_metric(RRDDIM *rd, usec_t point_in_time, storage_number number);
+extern void rrddim_collect_store_metric(RRDDIM *rd, usec_t point_in_time, calculated_number number, SN_FLAGS flags);
extern int rrddim_collect_finalize(RRDDIM *rd);
extern void rrddim_query_init(RRDDIM *rd, struct rrddim_query_handle *handle, time_t start_time, time_t end_time);
-extern storage_number rrddim_query_next_metric(struct rrddim_query_handle *handle, time_t *current_time);
+extern calculated_number rrddim_query_next_metric(struct rrddim_query_handle *handle, time_t *start_time, time_t *end_time, SN_FLAGS *flags);
extern int rrddim_query_is_finished(struct rrddim_query_handle *handle);
extern void rrddim_query_finalize(struct rrddim_query_handle *handle);
extern time_t rrddim_query_latest_time(RRDDIM *rd);
diff --git a/database/rrd.h b/database/rrd.h
index d17f0abb5f..76fb132b15 100644
--- a/database/rrd.h
+++ b/database/rrd.h
@@ -332,7 +332,7 @@ struct rrddim_collect_ops {
void (*init)(RRDDIM *rd);
// run this to store each metric into the database
- void (*store_metric)(RRDDIM *rd, usec_t point_in_time, storage_number number);
+ void (*store_metric)(RRDDIM *rd, usec_t point_in_time, calculated_number number, SN_FLAGS flags);
// an finalization function to run after collection is over
// returns 1 if it's safe to delete the dimension
@@ -345,7 +345,7 @@ struct rrddim_query_ops {
void (*init)(RRDDIM *rd, struct rrddim_query_handle *handle, time_t start_time, time_t end_time);
// run this to load each metric number from the database
- storage_number (*next_metric)(struct rrddim_query_handle *handle, time_t *current_time);
+ calculated_number (*next_metric)(struct rrddim_query_handle *handle, time_t *current_time, time_t *end_time, SN_FLAGS *flags);
// run this to test if the series of next_metric() database queries is finished
int (*is_finished)(struct rrddim_query_handle *handle);
@@ -1184,10 +1184,10 @@ static inline size_t rrdset_time2slot(RRDSET *st, time_t t) {
ret = rrdset_first_slot(st);
}
else {
- if(rrdset_last_slot(st) >= ((last_entry_t - t) / (size_t)(st->update_every)))
- ret = rrdset_last_slot(st) - ((last_entry_t - t) / (size_t)(st->update_every));
+ if(rrdset_last_slot(st) >= (size_t)((last_entry_t - t) / st->update_every))
+ ret = rrdset_last_slot(st) - ((last_entry_t - t) / st->update_every);
else
- ret = rrdset_last_slot(st) - ((last_entry_t - t) / (size_t)(st->update_every)) + (unsigned long)st->entries;
+ ret = rrdset_last_slot(st) - ((last_entry_t - t) / st->update_every) + st->entries;
}
}
@@ -1211,12 +1211,10 @@ static inline time_t rrdset_slot2time(RRDSET *st, size_t slot) {
slot = (size_t)st->entries - 1;
}
- if(slot > rrdset_last_slot(st)) {
- ret = last_entry_t - (size_t)st->update_every * (rrdset_last_slot(st) - slot + (size_t)st->entries);
- }
- else {
- ret = last_entry_t - (size_t)st->update_every;
- }
+ if(slot > rrdset_last_slot(st))
+ ret = last_entry_t - (time_t)(st->update_every * (rrdset_last_slot(st) - slot + (size_t)st->entries));
+ else
+ ret = last_entry_t - (time_t)(st->update_every * (rrdset_last_slot(st) - slot));
if(unlikely(ret < first_entry_t)) {
error("INTERNAL ERROR: rrdset_slot2time() on %s returns time too far in the past", st->name);
diff --git a/database/rrdset.c b/database/rrdset.c
index d872f66e2e..fe4ccc68bd 100644
--- a/database/rrdset.c
+++ b/database/rrdset.c
@@ -1130,7 +1130,7 @@ static inline size_t rrdset_done_interpolate(
size_t counter = st->counter;
long current_entry = st->current_entry;
- uint32_t storage_flags = SN_DEFAULT_FLAGS;
+ SN_FLAGS storage_flags = SN_DEFAULT_FLAGS;
if (has_reset_value)
storage_flags |= SN_EXISTS_RESET;
@@ -1234,8 +1234,7 @@ static inline size_t rrdset_done_interpolate(
if(unlikely(!store_this_entry)) {
(void) ml_is_anomalous(rd, 0, false);
- rd->state->collect_ops.store_metric(rd, next_store_ut, SN_EMPTY_SLOT);
-// rd->values[current_entry] = SN_EMPTY_SLOT;
+ rd->state->collect_ops.store_metric(rd, next_store_ut, NAN, SN_EMPTY_SLOT);
continue;
}
@@ -1247,18 +1246,8 @@ static inline size_t rrdset_done_interpolate(
dim_storage_flags &= ~ ((uint32_t) SN_ANOMALY_BIT);
}
- rd->state->collect_ops.store_metric(rd, next_store_ut, pack_storage_number(new_value, dim_storage_flags));
-// rd->values[current_entry] = pack_storage_number(new_value, storage_flags );
+ rd->state->collect_ops.store_metric(rd, next_store_ut, new_value, dim_storage_flags);
rd->last_stored_value = new_value;
-
- #ifdef NETDATA_INTERNAL_CHECKS
- rrdset_debug(st, "%s: STORE[%ld] "
- CALCULATED_NUMBER_FORMAT " = " CALCULATED_NUMBER_FORMAT
- , rd->name
- , current_entry
- , unpack_storage_number(rd->values[current_entry]), new_value
- );
- #endif
}
else {
(void) ml_is_anomalous(rd, 0, false);
@@ -1270,42 +1259,11 @@ static inline size_t rrdset_done_interpolate(
);
#endif
-// rd->values[current_entry] = SN_EMPTY_SLOT;
- rd->state->collect_ops.store_metric(rd, next_store_ut, SN_EMPTY_SLOT);
+ rd->state->collect_ops.store_metric(rd, next_store_ut, NAN, SN_EMPTY_SLOT);
rd->last_stored_value = NAN;
}
stored_entries++;
-
- #ifdef NETDATA_INTERNAL_CHECKS
- if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) {
- calculated_number t1 = new_value * (calculated_number)rd->multiplier / (calculated_number)rd->divisor;
- calculated_number t2 = unpack_storage_number(rd->values[current_entry]);
-
- calculated_number accuracy = accuracy_loss(t1, t2);
- debug(D_RRD_STATS, "%s/%s: UNPACK[%ld] = " CALCULATED_NUMBER_FORMAT " (original = " CALCULATED_NUMBER_FORMAT ", accuracy loss = " CALCULATED_NUMBER_FORMAT "%%%s)"
- , st->id, rd->name
- , current_entry
- , t2
- , t1
- , accuracy
- , (accuracy > ACCURACY_LOSS_ACCEPTED_PERCENT) ? " **TOO BIG** " : ""
- );
-
- rd->collected_volume += t1;
- rd->stored_volume += t2;
-
- accuracy = accuracy_loss(rd->collected_volume, rd->stored_volume);
- debug(D_RRD_STATS, "%s/%s: VOLUME[%ld] = " CALCULATED_NUMBER_FORMAT ", calculated = " CALCULATED_NUMBER_FORMAT ", accuracy loss = " CALCULATED_NUMBER_FORMAT "%%%s"
- , st->id, rd->name
- , current_entry
- , rd->stored_volume
- , rd->collected_volume
- , accuracy
- , (accuracy > ACCURACY_LOSS_ACCEPTED_PERCENT) ? " **TOO BIG** " : ""
- );
- }
- #endif
}
// reset the storage flags for the next point, if any;
storage_flags = SN_DEFAULT_FLAGS;
@@ -1503,6 +1461,7 @@ void rrdset_done(RRDSET *st) {
// and we have collected metrics for this chart in the past (st->counter != 0)
// fill the gap (the chart has been just loaded from disk)
if(unlikely(st->counter) && st->rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE) {
+ // TODO this should be inside the storage engine
rrdset_done_fill_the_gap(st);
las