From b32ca44319e35eb38e5858730f2ea44ea2268926 Mon Sep 17 00:00:00 2001 From: Costa Tsaousis Date: Wed, 22 Jun 2022 11:19:08 +0300 Subject: Query Engine multi-granularity support (and MC improvements) (#13155) * set grouping functions * storage engine should check the validity of timestamps, not the query engine * calculate and store in RRDR anomaly rates for every query * anomaly rate used by volume metric correlations * mc volume should use absolute data, to avoid cancelling effect * return anomaly-rates in jasonwrap with jw-anomaly-rates option to data queries * dont return null on anomaly rates * allow passing group query options from the URL * added countif to the query engine and used it in metric correlations * fix configure * fix countif and anomaly rate percentages * added group_options to metric correlations; updated swagger * added newline at the end of yaml file * always check the time the highlighted window was above/below the highlighted window * properly track time in memory queries * error for internal checks only * moved pack_storage_number() into the storage engines * moved unpack_storage_number() inside the storage engines * remove old comment * pass unit tests * properly detect zero or subnormal values in pack_storage_number() * fill nulls before the value, not after * make sure math.h is included * workaround for isfinite() * fix for isfinite() * faster isfinite() alternative * fix for faster isfinite() alternative * next_metric() now returns end_time too * variable step implemented in a generic way * remove left-over variables * ensure we always complete the wanted number of points * fixes * ensure no infinite loop * mc-volume-improvements: Add information about invalid condition * points should have a duration in the past * removed unneeded info() line * Fix unit tests for exporting engine * new_point should only be checked when it is fetched from the db; better comment about the premature breaking of the main query loop Co-authored-by: Thiago Marques Co-authored-by: Vladimir Kobal --- daemon/unit_test.c | 114 +++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 81 insertions(+), 33 deletions(-) (limited to 'daemon') diff --git a/daemon/unit_test.c b/daemon/unit_test.c index 35f8613a2b..5b4213ff53 100644 --- a/daemon/unit_test.c +++ b/daemon/unit_test.c @@ -1242,6 +1242,8 @@ int run_test(struct test *test) } static int test_variable_renames(void) { + fprintf(stderr, "%s() running...\n", __FUNCTION__ ); + fprintf(stderr, "Creating chart\n"); RRDSET *st = rrdset_create_localhost("chart", "ID", NULL, "family", "context", "Unit Testing", "a value", "unittest", NULL, 1, 1, RRDSET_TYPE_LINE); fprintf(stderr, "Created chart with id '%s', name '%s'\n", st->id, st->name); @@ -1326,6 +1328,7 @@ int check_strdupz_path_subpath() { int run_all_mockup_tests(void) { + fprintf(stderr, "%s() running...\n", __FUNCTION__ ); if(check_strdupz_path_subpath()) return 1; @@ -1399,6 +1402,7 @@ int run_all_mockup_tests(void) int unit_test(long delay, long shift) { + fprintf(stderr, "%s() running...\n", __FUNCTION__ ); static int repeat = 0; repeat++; @@ -1501,6 +1505,7 @@ int unit_test(long delay, long shift) } int test_sqlite(void) { + fprintf(stderr, "%s() running...\n", __FUNCTION__ ); sqlite3 *db_meta; fprintf(stderr, "Testing SQLIte\n"); @@ -1590,6 +1595,7 @@ static const int QUERY_BATCH = 4096; static void test_dbengine_create_charts(RRDHOST *host, RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DIMS], int update_every) { + fprintf(stderr, "%s() running...\n", __FUNCTION__ ); int i, j; char name[101]; @@ -1637,6 +1643,7 @@ static void test_dbengine_create_charts(RRDHOST *host, RRDSET *st[CHARTS], RRDDI static time_t test_dbengine_create_metrics(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DIMS], int current_region, time_t time_start) { + fprintf(stderr, "%s() running...\n", __FUNCTION__ ); time_t time_now; int i, j, c, update_every; collected_number next; @@ -1672,13 +1679,15 @@ static time_t test_dbengine_create_metrics(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS static int test_dbengine_check_metrics(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DIMS], int current_region, time_t time_start) { + fprintf(stderr, "%s() running...\n", __FUNCTION__ ); uint8_t same; time_t time_now, time_retrieved; int i, j, k, c, errors, update_every; collected_number last; calculated_number value, expected; - storage_number n; + SN_FLAGS nflags; struct rrddim_query_handle handle; + size_t value_errors = 0, time_errors = 0; update_every = REGION_UPDATE_EVERY[current_region]; errors = 0; @@ -1694,19 +1703,23 @@ static int test_dbengine_check_metrics(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DI j * REGION_POINTS[current_region] + c + k; expected = unpack_storage_number(pack_storage_number((calculated_number)last, SN_DEFAULT_FLAGS)); - n = rd[i][j]->state->query_ops.next_metric(&handle, &time_retrieved); - value = unpack_storage_number(n); + time_t end_time; + value = rd[i][j]->state->query_ops.next_metric(&handle, &time_retrieved, &end_time, &nflags); same = (calculated_number_round(value) == calculated_number_round(expected)) ? 1 : 0; if(!same) { - fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, expecting value " + if(!value_errors) + fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, expecting value " CALCULATED_NUMBER_FORMAT ", found " CALCULATED_NUMBER_FORMAT ", ### E R R O R ###\n", st[i]->name, rd[i][j]->name, (unsigned long)time_now + k * update_every, expected, value); + value_errors++; errors++; } - if(time_retrieved != time_now + k * update_every) { - fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, found timestamp %lu ### E R R O R ###\n", + if(end_time != time_now + k * update_every) { + if(!time_errors) + fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, found timestamp %lu ### E R R O R ###\n", st[i]->name, rd[i][j]->name, (unsigned long)time_now + k * update_every, (unsigned long)time_retrieved); + time_errors++; errors++; } } @@ -1714,6 +1727,13 @@ static int test_dbengine_check_metrics(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DI } } } + + if(value_errors) + fprintf(stderr, "%zu value errors encountered\n", value_errors); + + if(time_errors) + fprintf(stderr, "%zu time errors encountered\n", time_errors); + return errors; } @@ -1721,27 +1741,28 @@ static int test_dbengine_check_metrics(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DI static int test_dbengine_check_rrdr(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DIMS], int current_region, time_t time_start, time_t time_end) { + int update_every = REGION_UPDATE_EVERY[current_region]; + fprintf(stderr, "%s() running on region %d, start time %ld, end time %ld, update every %d...\n", __FUNCTION__, current_region, time_start, time_end, update_every); uint8_t same; time_t time_now, time_retrieved; - int i, j, errors, update_every; + int i, j, errors, value_errors = 0, time_errors = 0; long c; collected_number last; calculated_number value, expected; errors = 0; - update_every = REGION_UPDATE_EVERY[current_region]; long points = (time_end - time_start) / update_every; for (i = 0 ; i < CHARTS ; ++i) { ONEWAYALLOC *owa = onewayalloc_create(0); - RRDR *r = rrd2rrdr(owa, st[i], points, time_start + update_every, time_end, RRDR_GROUPING_AVERAGE, 0, 0, NULL, NULL, 0); + RRDR *r = rrd2rrdr(owa, st[i], points, time_start, time_end, RRDR_GROUPING_AVERAGE, 0, 0, NULL, NULL, NULL, 0); if (!r) { - fprintf(stderr, " DB-engine unittest %s: empty RRDR ### E R R O R ###\n", st[i]->name); + fprintf(stderr, " DB-engine unittest %s: empty RRDR on region %d ### E R R O R ###\n", st[i]->name, current_region); return ++errors; } else { assert(r->st == st[i]); for (c = 0; c != rrdr_rows(r) ; ++c) { RRDDIM *d; - time_now = time_start + (c + 2) * update_every; + time_now = time_start + (c + 1) * update_every; time_retrieved = r->t[c]; // for each dimension @@ -1750,20 +1771,22 @@ static int test_dbengine_check_rrdr(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DIMS] value = cn[j]; assert(rd[i][j] == d); - last = i * DIMS * REGION_POINTS[current_region] + j * REGION_POINTS[current_region] + c + 1; + last = i * DIMS * REGION_POINTS[current_region] + j * REGION_POINTS[current_region] + c; expected = unpack_storage_number(pack_storage_number((calculated_number)last, SN_DEFAULT_FLAGS)); same = (calculated_number_round(value) == calculated_number_round(expected)) ? 1 : 0; if(!same) { - fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, expecting value " + if(value_errors < 10) + fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, expecting value " CALCULATED_NUMBER_FORMAT ", RRDR found " CALCULATED_NUMBER_FORMAT ", ### E R R O R ###\n", st[i]->name, rd[i][j]->name, (unsigned long)time_now, expected, value); - errors++; + value_errors++; } if(time_retrieved != time_now) { - fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, found RRDR timestamp %lu ### E R R O R ###\n", + if(!time_errors) + fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, found RRDR timestamp %lu ### E R R O R ###\n", st[i]->name, rd[i][j]->name, (unsigned long)time_now, (unsigned long)time_retrieved); - errors++; + time_errors++; } } } @@ -1771,12 +1794,20 @@ static int test_dbengine_check_rrdr(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DIMS] } onewayalloc_destroy(owa); } - return errors; + + if(value_errors) + fprintf(stderr, "%d value errors encountered\n", value_errors); + + if(time_errors) + fprintf(stderr, "%d time errors encountered\n", time_errors); + + return errors + value_errors + time_errors; } int test_dbengine(void) { - int i, j, errors, update_every, current_region; + fprintf(stderr, "%s() running...\n", __FUNCTION__ ); + int i, j, errors, value_errors = 0, time_errors = 0, update_every, current_region; RRDHOST *host = NULL; RRDSET *st[CHARTS]; RRDDIM *rd[CHARTS][DIMS]; @@ -1854,7 +1885,7 @@ int test_dbengine(void) long point_offset = (time_start[current_region] - time_start[0]) / update_every; for (i = 0 ; i < CHARTS ; ++i) { ONEWAYALLOC *owa = onewayalloc_create(0); - RRDR *r = rrd2rrdr(owa, st[i], points, time_start[0] + update_every, time_end[REGIONS - 1], RRDR_GROUPING_AVERAGE, 0, 0, NULL, NULL, 0); + RRDR *r = rrd2rrdr(owa, st[i], points, time_start[0] + update_every, time_end[REGIONS - 1], RRDR_GROUPING_AVERAGE, 0, 0, NULL, NULL, NULL, 0); if (!r) { fprintf(stderr, " DB-engine unittest %s: empty RRDR ### E R R O R ###\n", st[i]->name); ++errors; @@ -1879,15 +1910,17 @@ int test_dbengine(void) uint8_t same = (calculated_number_round(value) == calculated_number_round(expected)) ? 1 : 0; if(!same) { - fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, expecting value " + if(!value_errors) + fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, expecting value " CALCULATED_NUMBER_FORMAT ", RRDR found " CALCULATED_NUMBER_FORMAT ", ### E R R O R ###\n", st[i]->name, rd[i][j]->name, (unsigned long)time_now, expected, value); - errors++; + value_errors++; } if(time_retrieved != time_now) { - fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, found RRDR timestamp %lu ### E R R O R ###\n", + if(!time_errors) + fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, found RRDR timestamp %lu ### E R R O R ###\n", st[i]->name, rd[i][j]->name, (unsigned long)time_now, (unsigned long)time_retrieved); - errors++; + time_errors++; } } } @@ -1902,7 +1935,7 @@ error_out: rrdeng_exit(host->rrdeng_ctx); rrd_unlock(); - return errors; + return errors + value_errors + time_errors; } struct dbengine_chart_thread { @@ -1937,6 +1970,7 @@ collected_number generate_dbengine_chart_value(int chart_i, int dim_i, time_t ti static void generate_dbengine_chart(void *arg) { + fprintf(stderr, "%s() running...\n", __FUNCTION__ ); struct dbengine_chart_thread *thread_info = (struct dbengine_chart_thread *)arg; RRDHOST *host = thread_info->host; char *chartname = thread_info->chartname; @@ -1989,6 +2023,7 @@ static void generate_dbengine_chart(void *arg) void generate_dbengine_dataset(unsigned history_seconds) { + fprintf(stderr, "%s() running...\n", __FUNCTION__ ); const int DSET_CHARTS = 16; const int DSET_DIMS = 128; const uint64_t EXPECTED_COMPRESSION_RATIO = 20; @@ -2063,6 +2098,7 @@ struct dbengine_query_thread { static void query_dbengine_chart(void *arg) { + fprintf(stderr, "%s() running...\n", __FUNCTION__ ); struct dbengine_query_thread *thread_info = (struct dbengine_query_thread *)arg; const int DSET_CHARTS = thread_info->dset_charts; const int DSET_DIMS = thread_info->dset_dims; @@ -2074,8 +2110,9 @@ static void query_dbengine_chart(void *arg) time_t time_now, time_retrieved; collected_number generatedv; calculated_number value, expected; - storage_number n; + SN_FLAGS nflags; struct rrddim_query_handle handle; + size_t value_errors = 0, time_errors = 0; do { // pick a chart and dimension @@ -2116,8 +2153,9 @@ static void query_dbengine_chart(void *arg) } break; } - n = rd->state->query_ops.next_metric(&handle, &time_retrieved); - if (SN_EMPTY_SLOT == n) { + time_t end_time; + value = rd->state->query_ops.next_metric(&handle, &time_retrieved, &end_time, &nflags); + if (!calculated_number_isnumber(value)) { if (!thread_info->delete_old_data) { /* data validation only when we don't delete */ fprintf(stderr, " DB-engine stresstest %s/%s: at %lu secs, expecting value " CALCULATED_NUMBER_FORMAT ", found data gap, ### E R R O R ###\n", @@ -2127,34 +2165,44 @@ static void query_dbengine_chart(void *arg) break; } ++thread_info->queried_metrics_nr; - value = unpack_storage_number(n); same = (calculated_number_round(value) == calculated_number_round(expected)) ? 1 : 0; if (!same) { if (!thread_info->delete_old_data) { /* data validation only when we don't delete */ - fprintf(stderr, " DB-engine stresstest %s/%s: at %lu secs, expecting value " + if(!value_errors) + fprintf(stderr, " DB-engine stresstest %s/%s: at %lu secs, expecting value " CALCULATED_NUMBER_FORMAT ", found " CALCULATED_NUMBER_FORMAT ", ### E R R O R ###\n", st->name, rd->name, (unsigned long) time_now, expected, value); - ++thread_info->errors; + value_errors++; + thread_info->errors++; } } - if (time_retrieved != time_now) { + if (end_time != time_now) { if (!thread_info->delete_old_data) { /* data validation only when we don't delete */ - fprintf(stderr, + if(!time_errors) + fprintf(stderr, " DB-engine stresstest %s/%s: at %lu secs, found timestamp %lu ### E R R O R ###\n", st->name, rd->name, (unsigned long) time_now, (unsigned long) time_retrieved); - ++thread_info->errors; + time_errors++; + thread_info->errors++; } } } rd->state->query_ops.finalize(&handle); } while(!thread_info->done); + + if(value_errors) + fprintf(stderr, "%zu value errors encountered\n", value_errors); + + if(time_errors) + fprintf(stderr, "%zu time errors encountered\n", time_errors); } void dbengine_stress_test(unsigned TEST_DURATION_SEC, unsigned DSET_CHARTS, unsigned QUERY_THREADS, unsigned RAMP_UP_SECONDS, unsigned PAGE_CACHE_MB, unsigned DISK_SPACE_MB) { + fprintf(stderr, "%s() running...\n", __FUNCTION__ ); const unsigned DSET_DIMS = 128; const uint64_t EXPECTED_COMPRESSION_RATIO = 20; const unsigned HISTORY_SECONDS = 3600 * 24 * 365 * 50; /* 50 year of history */ -- cgit v1.2.3