diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2022-06-22 11:19:08 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-06-22 11:19:08 +0300 |
commit | b32ca44319e35eb38e5858730f2ea44ea2268926 (patch) | |
tree | 40814c014d919657f6c96d55c0947631a1fb86ee /database | |
parent | 7b6e23e98c7c5624a76f8dd3435b41594fb5f39f (diff) |
Query Engine multi-granularity support (and MC improvements) (#13155)
* set grouping functions
* storage engine should check the validity of timestamps, not the query engine
* calculate and store in RRDR anomaly rates for every query
* anomaly rate used by volume metric correlations
* mc volume should use absolute data, to avoid cancelling effect
* return anomaly-rates in jasonwrap with jw-anomaly-rates option to data queries
* dont return null on anomaly rates
* allow passing group query options from the URL
* added countif to the query engine and used it in metric correlations
* fix configure
* fix countif and anomaly rate percentages
* added group_options to metric correlations; updated swagger
* added newline at the end of yaml file
* always check the time the highlighted window was above/below the highlighted window
* properly track time in memory queries
* error for internal checks only
* moved pack_storage_number() into the storage engines
* moved unpack_storage_number() inside the storage engines
* remove old comment
* pass unit tests
* properly detect zero or subnormal values in pack_storage_number()
* fill nulls before the value, not after
* make sure math.h is included
* workaround for isfinite()
* fix for isfinite()
* faster isfinite() alternative
* fix for faster isfinite() alternative
* next_metric() now returns end_time too
* variable step implemented in a generic way
* remove left-over variables
* ensure we always complete the wanted number of points
* fixes
* ensure no infinite loop
* mc-volume-improvements: Add information about invalid condition
* points should have a duration in the past
* removed unneeded info() line
* Fix unit tests for exporting engine
* new_point should only be checked when it is fetched from the db; better comment about the premature breaking of the main query loop
Co-authored-by: Thiago Marques <thiagoftsm@gmail.com>
Co-authored-by: Vladimir Kobal <vlad@prokk.net>
Diffstat (limited to 'database')
-rwxr-xr-x | database/engine/rrdengineapi.c | 48 | ||||
-rw-r--r-- | database/engine/rrdengineapi.h | 4 | ||||
-rw-r--r-- | database/metric_correlations.c | 129 | ||||
-rw-r--r-- | database/metric_correlations.h | 3 | ||||
-rw-r--r-- | database/ram/rrddim_mem.c | 54 | ||||
-rw-r--r-- | database/ram/rrddim_mem.h | 14 | ||||
-rw-r--r-- | database/rrd.h | 20 | ||||
-rw-r--r-- | database/rrdset.c | 51 |
8 files changed, 196 insertions, 127 deletions
diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c index ceead28c41..0792ebe001 100755 --- a/database/engine/rrdengineapi.c +++ b/database/engine/rrdengineapi.c @@ -193,8 +193,10 @@ void rrdeng_store_metric_flush_current_page(RRDDIM *rd) handle->descr = NULL; } -void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number number) +void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, calculated_number n, SN_FLAGS flags) { + storage_number number = pack_storage_number(n, flags); + struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)rd->state->handle; struct rrdengine_instance *ctx; struct page_cache *pg_cache; @@ -509,6 +511,8 @@ unsigned rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t e */ void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_handle, time_t start_time, time_t end_time) { + // fprintf(stderr, "%s: %s/%s start time %ld, end time %ld\n", __FUNCTION__ , rd->rrdset->name, rd->name, start_time, end_time); + struct rrdeng_query_handle *handle; struct rrdengine_instance *ctx; unsigned pages_nr; @@ -520,6 +524,8 @@ void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_hand handle = callocz(1, sizeof(struct rrdeng_query_handle)); handle->next_page_time = start_time; handle->now = start_time; + handle->dt = rd->update_every * USEC_PER_SEC; + handle->dt_sec = rd->update_every; handle->position = 0; handle->ctx = ctx; handle->descr = NULL; @@ -527,7 +533,7 @@ void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_hand pages_nr = pg_cache_preload(ctx, rd->state->rrdeng_uuid, start_time * USEC_PER_SEC, end_time * USEC_PER_SEC, NULL, &handle->page_index); if (unlikely(NULL == handle->page_index || 0 == pages_nr)) - /* there are no metrics to load */ + // there are no metrics to load handle->next_page_time = INVALID_TIME; } @@ -585,40 +591,50 @@ static int rrdeng_load_page_next(struct rrddim_query_handle *rrdimm_handle) { usec_t entries = handle->entries = page_length / sizeof(storage_number); if (likely(entries > 1)) handle->dt = (page_end_time - descr->start_time) / (entries - 1); - else - handle->dt = 0; - handle->dt_sec = handle->dt / USEC_PER_SEC; + handle->dt_sec = (time_t)(handle->dt / USEC_PER_SEC); handle->position = position; return 0; } -/* Returns the metric and sets its timestamp into current_time */ -storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle, time_t *current_time) { +// Returns the metric and sets its timestamp into current_time +// IT IS REQUIRED TO **ALWAYS** SET ALL RETURN VALUES (current_time, end_time, flags) +// IT IS REQUIRED TO **ALWAYS** KEEP TRACK OF TIME, EVEN OUTSIDE THE DATABASE BOUNDARIES +calculated_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle, time_t *start_time, time_t *end_time, SN_FLAGS *flags) { struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle; - if (unlikely(INVALID_TIME == handle->next_page_time)) - return SN_EMPTY_SLOT; - struct rrdeng_page_descr *descr = handle->descr; unsigned position = handle->position + 1; time_t now = handle->now + handle->dt_sec; + if (unlikely(INVALID_TIME == handle->next_page_time)) { + handle->next_page_time = INVALID_TIME; + handle->now = now; + *start_time = now - handle->dt_sec; + *end_time = now; + *flags = SN_EMPTY_SLOT; + return NAN; + } + if (unlikely(!descr || position >= handle->entries)) { // We need to get a new page if(rrdeng_load_page_next(rrdimm_handle)) { // next calls will not load any more metrics handle->next_page_time = INVALID_TIME; - return SN_EMPTY_SLOT; + handle->now = now; + *start_time = now - handle->dt_sec; + *end_time = now; + *flags = SN_EMPTY_SLOT; + return NAN; } descr = handle->descr; position = handle->position; - now = (descr->start_time + position * handle->dt) / USEC_PER_SEC; + now = (time_t)((descr->start_time + position * handle->dt) / USEC_PER_SEC); } - storage_number ret = handle->page[position]; + storage_number n = handle->page[position]; handle->position = position; handle->now = now; @@ -627,8 +643,10 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle handle->next_page_time = INVALID_TIME; } - *current_time = now; - return ret; + *flags = n & SN_ALL_FLAGS; + *start_time = now - handle->dt_sec; + *end_time = now; + return unpack_storage_number(n); } int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle) diff --git a/database/engine/rrdengineapi.h b/database/engine/rrdengineapi.h index 67443c1539..798545d9d7 100644 --- a/database/engine/rrdengineapi.h +++ b/database/engine/rrdengineapi.h @@ -40,14 +40,14 @@ extern void rrdeng_convert_legacy_uuid_to_multihost(char machine_guid[GUID_LEN + extern void rrdeng_metric_init(RRDDIM *rd); extern void rrdeng_store_metric_init(RRDDIM *rd); extern void rrdeng_store_metric_flush_current_page(RRDDIM *rd); -extern void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number number); +extern void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, calculated_number number, SN_FLAGS flags); extern int rrdeng_store_metric_finalize(RRDDIM *rd); extern unsigned rrdeng_variable_step_boundaries(RRDSET *st, time_t start_time, time_t end_time, struct rrdeng_region_info **region_info_arrayp, unsigned *max_intervalp, struct context_param *context_param_list); extern void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_handle, time_t start_time, time_t end_time); -extern storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle, time_t *current_time); +extern calculated_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle, time_t *start_time, time_t *end_time, SN_FLAGS *flags); extern int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle); extern void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle); extern time_t rrdeng_metric_latest_time(RRDDIM *rd); diff --git a/database/metric_correlations.c b/database/metric_correlations.c index 925768f5da..148f2acbce 100644 --- a/database/metric_correlations.c +++ b/database/metric_correlations.c @@ -9,6 +9,7 @@ int metric_correlations_version = 1; METRIC_CORRELATIONS_METHOD default_metric_correlations_method = METRIC_CORRELATIONS_KS2; typedef struct mc_stats { + calculated_number max_base_high_ratio; size_t db_points; size_t result_points; size_t db_queries; @@ -46,7 +47,13 @@ const char *mc_method_to_string(METRIC_CORRELATIONS_METHOD method) { // ---------------------------------------------------------------------------- // The results per dimension are aggregated into a dictionary +typedef enum { + RESULT_IS_BASE_HIGH_RATIO = (1 << 0), + RESULT_IS_PERCENTAGE_OF_TIME = (1 << 1), +} RESULT_FLAGS; + struct register_result { + RESULT_FLAGS flags; RRDSET *st; const char *chart_id; const char *context; @@ -86,15 +93,26 @@ static void register_result_destroy(DICTIONARY *results) { dictionary_destroy(results); } -static void register_result(DICTIONARY *results, RRDSET *st, RRDDIM *d, calculated_number value) { +static void register_result(DICTIONARY *results, RRDSET *st, RRDDIM *d, calculated_number value, RESULT_FLAGS flags, MC_STATS *stats) { if(!calculated_number_isnumber(value)) return; + // make it positive + calculated_number v = calculated_number_fabs(value); + + // no need to store zero scored values + if(v == 0.0) return; + + // keep track of the max of the baseline / highlight ratio + if(flags & RESULT_IS_BASE_HIGH_RATIO && v > stats->max_base_high_ratio) + stats->max_base_high_ratio = v; + struct register_result t = { + .flags = flags, .st = st, .chart_id = st->id, .context = st->context, .dim_name = d->name, - .value = value + .value = v }; char buf[5000 + 1]; @@ -361,7 +379,8 @@ static double kstwo(calculated_number baseline[], int baseline_points, calculate static int rrdset_metric_correlations_ks2(RRDSET *st, DICTIONARY *results, long long baseline_after, long long baseline_before, long long after, long long before, - long long points, RRDR_OPTIONS options, RRDR_GROUPING group, + long long points, RRDR_OPTIONS options, + RRDR_GROUPING group, const char *group_options, uint32_t shifts, int timeout, MC_STATS *stats) { long group_time = 0; struct context_param *context_param_list = NULL; @@ -377,7 +396,8 @@ static int rrdset_metric_correlations_ks2(RRDSET *st, DICTIONARY *results, ONEWAYALLOC *owa = onewayalloc_create(0); high_rrdr = rrd2rrdr(owa, st, points, after, before, group, - group_time, options, NULL, context_param_list, timeout); + group_time, options, NULL, context_param_list, group_options, + timeout); if(!high_rrdr) { info("Metric correlations: rrd2rrdr() failed for the highlighted window on chart '%s'.", st->name); goto cleanup; @@ -402,7 +422,7 @@ static int rrdset_metric_correlations_ks2(RRDSET *st, DICTIONARY *results, stats->db_queries++; base_rrdr = rrd2rrdr(owa, st,high_points << shifts, baseline_after, baseline_before, group, - group_time, options, NULL, context_param_list, + group_time, options, NULL, context_param_list, group_options, (int)(timeout - ((now_usec - started_usec) / USEC_PER_MS))); if(!base_rrdr) { info("Metric correlations: rrd2rrdr() failed for the baseline window on chart '%s'.", st->name); @@ -477,7 +497,7 @@ static int rrdset_metric_correlations_ks2(RRDSET *st, DICTIONARY *results, // to spread the results evenly, 0.0 needs to be the less correlated and 1.0 the most correlated // so we flip the result of kstwo() - register_result(results, base_rrdr->st, d, 1.0 - prob); + register_result(results, base_rrdr->st, d, 1.0 - prob, RESULT_IS_BASE_HIGH_RATIO, stats); } } @@ -494,8 +514,9 @@ cleanup: static int rrdset_metric_correlations_volume(RRDSET *st, DICTIONARY *results, long long baseline_after, long long baseline_before, long long after, long long before, - RRDR_OPTIONS options, RRDR_GROUPING group, int timeout, MC_STATS *stats) { - options |= RRDR_OPTION_MATCH_IDS; + RRDR_OPTIONS options, RRDR_GROUPING group, const char *group_options, + int timeout, MC_STATS *stats) { + options |= RRDR_OPTION_MATCH_IDS | RRDR_OPTION_ABSOLUTE; long group_time = 0; int correlated_dimensions = 0; @@ -516,46 +537,79 @@ static int rrdset_metric_correlations_volume(RRDSET *st, DICTIONARY *results, // dimensions, and we query a single dimension at a time. stats->db_queries++; + calculated_number baseline_average = NAN; + uint8_t base_anomaly_rate = 0; + value_is_null = 1; + ret = rrdset2value_api_v1(st, NULL, &baseline_average, d->id, 1, + baseline_after, baseline_before, + group, group_options, group_time, options, + NULL, NULL, + &stats->db_points, &stats->result_points, + &value_is_null, &base_anomaly_rate, 0); + + if(ret != HTTP_RESP_OK || value_is_null || !calculated_number_isnumber(baseline_average)) { + // this means no data for the baseline window, but we may have data for the highlighted one - assume zero + baseline_average = 0.0; + } + + stats->db_queries++; calculated_number highlight_average = NAN; + uint8_t high_anomaly_rate = 0; value_is_null = 1; ret = rrdset2value_api_v1(st, NULL, &highlight_average, d->id, 1, after, before, - group, group_time, options, + group, group_options, group_time, options, NULL, NULL, &stats->db_points, &stats->result_points, - &value_is_null, 0); + &value_is_null, &high_anomaly_rate, 0); if(ret != HTTP_RESP_OK || value_is_null || !calculated_number_isnumber(highlight_average)) { - // error("Metric correlations: cannot query highlight duration of dimension '%s' of chart '%s', %d %s %s %s", st->name, d->name, ret, (ret != HTTP_RESP_OK)?"response failed":"", (value_is_null)?"value is null":"", (!calculated_number_isnumber(highlight_average))?"result is NAN":""); // this means no data for the highlighted duration - so skip it continue; } + if(baseline_average == highlight_average) { + // they are the same - let's move on + continue; + } + stats->db_queries++; - calculated_number baseline_average = NAN; + calculated_number highlight_countif = NAN; value_is_null = 1; - ret = rrdset2value_api_v1(st, NULL, &baseline_average, d->id, 1, - baseline_after, baseline_before, - group, group_time, options, + + char highlighted_countif_options[50 + 1]; + snprintfz(highlighted_countif_options, 50, "%s" CALCULATED_NUMBER_FORMAT, highlight_average < baseline_average ? "<":">", baseline_average); + + ret = rrdset2value_api_v1(st, NULL, &highlight_countif, d->id, 1, + after, before, + RRDR_GROUPING_COUNTIF,highlighted_countif_options, + group_time, options, NULL, NULL, &stats->db_points, &stats->result_points, - &value_is_null, 0); + &value_is_null, NULL, 0); - if(ret != HTTP_RESP_OK || value_is_null || !calculated_number_isnumber(baseline_average)) { - // error("Metric correlations: cannot query baseline duration of dimension '%s' of chart '%s', %d %s %s %s", st->name, d->name, ret, (ret != HTTP_RESP_OK)?"response failed":"", (value_is_null)?"value is null":"", (!calculated_number_isnumber(baseline_average))?"result is NAN":""); - // continue; - // this means no data for the baseline window, but we have data for the highlighted one - assume zero - baseline_average = 0.0; + if(ret != HTTP_RESP_OK || value_is_null || !calculated_number_isnumber(highlight_countif)) { + info("MC: highlighted countif query failed, but highlighted average worked - strange..."); + continue; } - calculated_number pcent = NAN; - if(isgreater(baseline_average, 0.0) || isless(baseline_average, 0.0)) - pcent = (highlight_average - baseline_average) / baseline_average; + // this represents the percentage of time + // the highlighted window was above/below the baseline window + // (above or below depending on their averages) + highlight_countif = highlight_countif / 100.0; // countif returns 0 - 100.0 - else if(isgreater(highlight_average, 0.0) || isless(highlight_average, 0.0)) - pcent = highlight_average; + RESULT_FLAGS flags; + calculated_number pcent = NAN; + if(isgreater(baseline_average, 0.0) || isless(baseline_average, 0.0)) { + flags = RESULT_IS_BASE_HIGH_RATIO; + pcent = (highlight_average - baseline_average) / baseline_average * highlight_countif; + } + else { + flags = RESULT_IS_PERCENTAGE_OF_TIME; + pcent = highlight_countif; + } - register_result(results, st, d, pcent); + register_result(results, st, d, pcent, flags, stats); } return correlated_dimensions; @@ -590,18 +644,23 @@ static inline int binary_search_bigger_than_calculated_number(const calculated_n // ---------------------------------------------------------------------------- // spread the results evenly according to their value -static size_t spread_results_evenly(DICTIONARY *results) { +static size_t spread_results_evenly(DICTIONARY *results, MC_STATS *stats) { struct register_result *t; // count the dimensions size_t dimensions = dictionary_stats_entries(results); if(!dimensions) return 0; + if(stats->max_base_high_ratio == 0.0) + stats->max_base_high_ratio = 1.0; + // create an array of the right size and copy all the values in it calculated_number slots[dimensions]; dimensions = 0; dfe_start_read(results, t) { - t->value = calculated_number_fabs(t->value); + if(t->flags & (RESULT_IS_PERCENTAGE_OF_TIME)) + t->value = t->value * stats->max_base_high_ratio; + slots[dimensions++] = t->value; } dfe_done(t); @@ -639,7 +698,8 @@ static size_t spread_results_evenly(DICTIONARY *results) { // ---------------------------------------------------------------------------- // The main function -int metric_correlations(RRDHOST *host, BUFFER *wb, METRIC_CORRELATIONS_METHOD method, RRDR_GROUPING group, +int metric_correlations(RRDHOST *host, BUFFER *wb, METRIC_CORRELATIONS_METHOD method, + RRDR_GROUPING group, const char *group_options, long long baseline_after, long long baseline_before, long long after, long long before, long long points, RRDR_OPTIONS options, int timeout) { @@ -727,8 +787,7 @@ int metric_correlations(RRDHOST *host, BUFFER *wb, METRIC_CORRELATIONS_METHOD me while((points << shifts) > MAX_POINTS) points = points >> 1; - if(points < 100) { - // error = "cannot comply to at least 100 points"; + if(points < 15) { resp = HTTP_RESP_BAD_REQUEST; goto cleanup; } @@ -769,7 +828,7 @@ int metric_correlations(RRDHOST *host, BUFFER *wb, METRIC_CORRELATIONS_METHOD me correlated_dimensions += rrdset_metric_correlations_volume(st, results, baseline_after, baseline_before, after, before, - options, group, + options, group, group_options, (int)(timeout - ((now_usec - started_usec) / USEC_PER_MS)), &stats); break; @@ -779,7 +838,7 @@ int metric_correlations(RRDHOST *host, BUFFER *wb, METRIC_CORRELATIONS_METHOD me correlated_dimensions += rrdset_metric_correlations_ks2(st, results, baseline_after, baseline_before, after, before, - points, options, group, shifts, + points, options, group, group_options, shifts, (int)(timeout - ((now_usec - started_usec) / USEC_PER_MS)), &stats); break; @@ -790,7 +849,7 @@ int metric_correlations(RRDHOST *host, BUFFER *wb, METRIC_CORRELATIONS_METHOD me dfe_done(ptr); if(!(options & RRDR_OPTION_RETURN_RAW)) - spread_results_evenly(results); + spread_results_evenly(results, &stats); usec_t ended_usec = now_realtime_usec(); diff --git a/database/metric_correlations.h b/database/metric_correlations.h index aa8944d922..4d0b4dba3b 100644 --- a/database/metric_correlations.h +++ b/database/metric_correlations.h @@ -14,7 +14,8 @@ extern int enable_metric_correlations; extern int metric_correlations_version; extern METRIC_CORRELATIONS_METHOD default_metric_correlations_method; -extern int metric_correlations (RRDHOST *host, BUFFER *wb, METRIC_CORRELATIONS_METHOD method, RRDR_GROUPING group, +extern int metric_correlations (RRDHOST *host, BUFFER *wb, METRIC_CORRELATIONS_METHOD method, + RRDR_GROUPING group, const char *group_options, long long baseline_after, long long baseline_before, long long after, long long before, long long points, RRDR_OPTIONS options, int timeout); diff --git a/database/ram/rrddim_mem.c b/database/ram/rrddim_mem.c index b17f03ca50..03b6983a29 100644 --- a/database/ram/rrddim_mem.c +++ b/database/ram/rrddim_mem.c @@ -9,9 +9,9 @@ void rrddim_collect_init(RRDDIM *rd) { rd->values[rd->rrdset->current_entry] = SN_EMPTY_SLOT; rd->state->handle = calloc(1, sizeof(struct mem_collect_handle)); } -void rrddim_collect_store_metric(RRDDIM *rd, usec_t point_in_time, storage_number number) { +void rrddim_collect_store_metric(RRDDIM *rd, usec_t point_in_time, calculated_number number, SN_FLAGS flags) { (void)point_in_time; - rd->values[rd->rrdset->current_entry] = number; + rd->values[rd->rrdset->current_entry] = pack_storage_number(number, flags); } int rrddim_collect_finalize(RRDDIM *rd) { free((struct mem_collect_handle*)rd->state->handle); @@ -28,33 +28,63 @@ void rrddim_query_init(RRDDIM *rd, struct rrddim_query_handle *handle, time_t st struct mem_query_handle* h = calloc(1, sizeof(struct mem_query_handle)); h->slot = rrdset_time2slot(rd->rrdset, start_time); h->last_slot = rrdset_time2slot(rd->rrdset, end_time); - h->finished = 0; + h->dt = rd->update_every; + + h->next_timestamp = start_time; + h->slot_timestamp = rrdset_slot2time(rd->rrdset, h->slot); + h->last_timestamp = rrdset_slot2time(rd->rrdset, h->last_slot); + + // info("QUERY: start %ld, end %ld, next %ld, first %ld, last %ld", start_time, end_time, h->next_timestamp, h->slot_timestamp, h->last_timestamp); + handle->handle = (STORAGE_QUERY_HANDLE *)h; } -storage_number rrddim_query_next_metric(struct rrddim_query_handle *handle, time_t *current_time) { +// Returns the metric and sets its timestamp into current_time +// IT IS REQUIRED TO **ALWAYS** SET ALL RETURN VALUES (current_time, end_time, flags) +// IT IS REQUIRED TO **ALWAYS** KEEP TRACK OF TIME, EVEN OUTSIDE THE DATABASE BOUNDARIES +calculated_number rrddim_query_next_metric(struct rrddim_query_handle *handle, time_t *start_time, time_t *end_time, SN_FLAGS *flags) { RRDDIM *rd = handle->rd; struct mem_query_handle* h = (struct mem_query_handle*)handle->handle; - long entries = rd->rrdset->entries; - long slot = h->slot; + size_t entries = rd->rrdset->entries; + size_t slot = h->slot; - (void)current_time; - if (unlikely(h->slot == h->last_slot)) - h->finished = 1; - storage_number n = rd->values[slot++]; + time_t this_timestamp = h->next_timestamp; + h->next_timestamp += h->dt; + + // set this timestamp for our caller + *start_time = this_timestamp - h->dt; + *end_time = this_timestamp; + if(unlikely(this_timestamp < h->slot_timestamp)) { + *flags = SN_EMPTY_SLOT; + return NAN; + } + + if(unlikely(this_timestamp > h->last_timestamp)) { + *flags = SN_EMPTY_SLOT; + return NAN; + } + + storage_number n = rd->values[slot++]; if(unlikely(slot >= entries)) slot = 0; + h->slot = slot; + h->slot_timestamp += h->dt; - return n; + *flags = (n & SN_ALL_FLAGS); + return unpack_storage_number(n); } int rrddim_query_is_finished(struct rrddim_query_handle *handle) { struct mem_query_handle* h = (struct mem_query_handle*)handle->handle; - return h->finished; + return (h->next_timestamp > handle->end_time); } void rrddim_query_finalize(struct rrddim_query_handle *handle) { +#ifdef NETDATA_INTERNAL_CHECKS + if(!rrddim_query_is_finished(handle)) + error("QUERY: query for chart '%s' dimension '%s' has been stopped unfinished", handle->rd->rrdset->id, handle->rd->name); +#endif freez(handle->handle); } diff --git a/database/ram/rrddim_mem.h b/database/ram/rrddim_mem.h index 9a215387ae..ede8a4e211 100644 --- a/database/ram/rrddim_mem.h +++ b/database/ram/rrddim_mem.h @@ -9,18 +9,22 @@ struct mem_collect_handle { long slot; long entries; }; + struct mem_query_handle { - long slot; - long last_slot; - uint8_t finished; + time_t dt; + time_t next_timestamp; + time_t last_timestamp; + time_t slot_timestamp; + size_t slot; + size_t last_slot; }; extern void rrddim_collect_init(RRDDIM *rd); -extern void rrddim_collect_store_metric(RRDDIM *rd, usec_t point_in_time, storage_number number); +extern void rrddim_collect_store_metric(RRDDIM *rd, usec_t point_in_time, calculated_number number, SN_FLAGS flags); extern int rrddim_collect_finalize(RRDDIM *rd); extern void rrddim_query_init(RRDDIM *rd, struct rrddim_query_handle *handle, time_t start_time, time_t end_time); -extern storage_number rrddim_query_next_metric(struct rrddim_query_handle *handle, time_t *current_time); +extern calculated_number rrddim_query_next_metric(struct rrddim_query_handle *handle, time_t *start_time, time_t *end_time, SN_FLAGS *flags); extern int rrddim_query_is_finished(struct rrddim_query_handle *handle); extern void rrddim_query_finalize(struct rrddim_query_handle *handle); extern time_t rrddim_query_latest_time(RRDDIM *rd); diff --git a/database/rrd.h b/database/rrd.h index d17f0abb5f..76fb132b15 100644 --- a/database/rrd.h +++ b/database/rrd.h @@ -332,7 +332,7 @@ struct rrddim_collect_ops { void (*init)(RRDDIM *rd); // run this to store each metric into the database - void (*store_metric)(RRDDIM *rd, usec_t point_in_time, storage_number number); + void (*store_metric)(RRDDIM *rd, usec_t point_in_time, calculated_number number, SN_FLAGS flags); // an finalization function to run after collection is over // returns 1 if it's safe to delete the dimension @@ -345,7 +345,7 @@ struct rrddim_query_ops { void (*init)(RRDDIM *rd, struct rrddim_query_handle *handle, time_t start_time, time_t end_time); // run this to load each metric number from the database - storage_number (*next_metric)(struct rrddim_query_handle *handle, time_t *current_time); + calculated_number (*next_metric)(struct rrddim_query_handle *handle, time_t *current_time, time_t *end_time, SN_FLAGS *flags); // run this to test if the series of next_metric() database queries is finished int (*is_finished)(struct rrddim_query_handle *handle); @@ -1184,10 +1184,10 @@ static inline size_t rrdset_time2slot(RRDSET *st, time_t t) { ret = rrdset_first_slot(st); } else { - if(rrdset_last_slot(st) >= ((last_entry_t - t) / (size_t)(st->update_every))) - ret = rrdset_last_slot(st) - ((last_entry_t - t) / (size_t)(st->update_every)); + if(rrdset_last_slot(st) >= (size_t)((last_entry_t - t) / st->update_every)) + ret = rrdset_last_slot(st) - ((last_entry_t - t) / st->update_every); else - ret = rrdset_last_slot(st) - ((last_entry_t - t) / (size_t)(st->update_every)) + (unsigned long)st->entries; + ret = rrdset_last_slot(st) - ((last_entry_t - t) / st->update_every) + st->entries; } } @@ -1211,12 +1211,10 @@ static inline time_t rrdset_slot2time(RRDSET *st, size_t slot) { slot = (size_t)st->entries - 1; } - if(slot > rrdset_last_slot(st)) { - ret = last_entry_t - (size_t)st->update_every * (rrdset_last_slot(st) - slot + (size_t)st->entries); - } - else { - ret = last_entry_t - (size_t)st->update_every; - } + if(slot > rrdset_last_slot(st)) + ret = last_entry_t - (time_t)(st->update_every * (rrdset_last_slot(st) - slot + (size_t)st->entries)); + else + ret = last_entry_t - (time_t)(st->update_every * (rrdset_last_slot(st) - slot)); if(unlikely(ret < first_entry_t)) { error("INTERNAL ERROR: rrdset_slot2time() on %s returns time too far in the past", st->name); diff --git a/database/rrdset.c b/database/rrdset.c index d872f66e2e..fe4ccc68bd 100644 --- a/database/rrdset.c +++ b/database/rrdset.c @@ -1130,7 +1130,7 @@ static inline size_t rrdset_done_interpolate( size_t counter = st->counter; long current_entry = st->current_entry; - uint32_t storage_flags = SN_DEFAULT_FLAGS; + SN_FLAGS storage_flags = SN_DEFAULT_FLAGS; if (has_reset_value) storage_flags |= SN_EXISTS_RESET; @@ -1234,8 +1234,7 @@ static inline size_t rrdset_done_interpolate( if(unlikely(!store_this_entry)) { (void) ml_is_anomalous(rd, 0, false); - rd->state->collect_ops.store_metric(rd, next_store_ut, SN_EMPTY_SLOT); -// rd->values[current_entry] = SN_EMPTY_SLOT; + rd->state->collect_ops.store_metric(rd, next_store_ut, NAN, SN_EMPTY_SLOT); continue; } @@ -1247,18 +1246,8 @@ static inline size_t rrdset_done_interpolate( dim_storage_flags &= ~ ((uint32_t) SN_ANOMALY_BIT); } - rd->state->collect_ops.store_metric(rd, next_store_ut, pack_storage_number(new_value, dim_storage_flags)); -// rd->values[current_entry] = pack_storage_number(new_value, storage_flags ); + rd->state->collect_ops.store_metric(rd, next_store_ut, new_value, dim_storage_flags); rd->last_stored_value = new_value; - - #ifdef NETDATA_INTERNAL_CHECKS - rrdset_debug(st, "%s: STORE[%ld] " - CALCULATED_NUMBER_FORMAT " = " CALCULATED_NUMBER_FORMAT - , rd->name - , current_entry - , unpack_storage_number(rd->values[current_entry]), new_value - ); - #endif } else { (void) ml_is_anomalous(rd, 0, false); @@ -1270,42 +1259,11 @@ static inline size_t rrdset_done_interpolate( ); #endif -// rd->values[current_entry] = SN_EMPTY_SLOT; - rd->state->collect_ops.store_metric(rd, next_store_ut, SN_EMPTY_SLOT); + rd->state->collect_ops.store_metric(rd, next_store_ut, NAN, SN_EMPTY_SLOT); rd->last_stored_value = NAN; } stored_entries++; - - #ifdef NETDATA_INTERNAL_CHECKS - if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) { - calculated_number t1 = new_value * (calculated_number)rd->multiplier / (calculated_number)rd->divisor; - calculated_number t2 = unpack_storage_number(rd->values[current_entry]); - - calculated_number accuracy = accuracy_loss(t1, t2); - debug(D_RRD_STATS, "%s/%s: UNPACK[%ld] = " CALCULATED_NUMBER_FORMAT " (original = " CALCULATED_NUMBER_FORMAT ", accuracy loss = " CALCULATED_NUMBER_FORMAT "%%%s)" - , st->id, rd->name - , current_entry - , t2 - , t1 - , accuracy - , (accuracy > ACCURACY_LOSS_ACCEPTED_PERCENT) ? " **TOO BIG** " : "" - ); - - rd->collected_volume += t1; - rd->stored_volume += t2; - - accuracy = accuracy_loss(rd->collected_volume, rd->stored_volume); - debug(D_RRD_STATS, "%s/%s: VOLUME[%ld] = " CALCULATED_NUMBER_FORMAT ", calculated = " CALCULATED_NUMBER_FORMAT ", accuracy loss = " CALCULATED_NUMBER_FORMAT "%%%s" - , st->id, rd->name - , current_entry - , rd->stored_volume - , rd->collected_volume - , accuracy - , (accuracy > ACCURACY_LOSS_ACCEPTED_PERCENT) ? " **TOO BIG** " : "" - ); - } - #endif } // reset the storage flags for the next point, if any; storage_flags = SN_DEFAULT_FLAGS; @@ -1503,6 +1461,7 @@ void rrdset_done(RRDSET *st) { // and we have collected metrics for this chart in the past (st->counter != 0) // fill the gap (the chart has been just loaded from disk) if(unlikely(st->counter) && st->rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE) { + // TODO this should be inside the storage engine rrdset_done_fill_the_gap(st); las |