summaryrefslogtreecommitdiffstats
path: root/daemon
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-06-22 11:19:08 +0300
committerGitHub <noreply@github.com>2022-06-22 11:19:08 +0300
commitb32ca44319e35eb38e5858730f2ea44ea2268926 (patch)
tree40814c014d919657f6c96d55c0947631a1fb86ee /daemon
parent7b6e23e98c7c5624a76f8dd3435b41594fb5f39f (diff)
Query Engine multi-granularity support (and MC improvements) (#13155)
* set grouping functions * storage engine should check the validity of timestamps, not the query engine * calculate and store in RRDR anomaly rates for every query * anomaly rate used by volume metric correlations * mc volume should use absolute data, to avoid cancelling effect * return anomaly-rates in jasonwrap with jw-anomaly-rates option to data queries * dont return null on anomaly rates * allow passing group query options from the URL * added countif to the query engine and used it in metric correlations * fix configure * fix countif and anomaly rate percentages * added group_options to metric correlations; updated swagger * added newline at the end of yaml file * always check the time the highlighted window was above/below the highlighted window * properly track time in memory queries * error for internal checks only * moved pack_storage_number() into the storage engines * moved unpack_storage_number() inside the storage engines * remove old comment * pass unit tests * properly detect zero or subnormal values in pack_storage_number() * fill nulls before the value, not after * make sure math.h is included * workaround for isfinite() * fix for isfinite() * faster isfinite() alternative * fix for faster isfinite() alternative * next_metric() now returns end_time too * variable step implemented in a generic way * remove left-over variables * ensure we always complete the wanted number of points * fixes * ensure no infinite loop * mc-volume-improvements: Add information about invalid condition * points should have a duration in the past * removed unneeded info() line * Fix unit tests for exporting engine * new_point should only be checked when it is fetched from the db; better comment about the premature breaking of the main query loop Co-authored-by: Thiago Marques <thiagoftsm@gmail.com> Co-authored-by: Vladimir Kobal <vlad@prokk.net>
Diffstat (limited to 'daemon')
-rw-r--r--daemon/unit_test.c114
1 files changed, 81 insertions, 33 deletions
diff --git a/daemon/unit_test.c b/daemon/unit_test.c
index 35f8613a2b..5b4213ff53 100644
--- a/daemon/unit_test.c
+++ b/daemon/unit_test.c
@@ -1242,6 +1242,8 @@ int run_test(struct test *test)
}
static int test_variable_renames(void) {
+ fprintf(stderr, "%s() running...\n", __FUNCTION__ );
+
fprintf(stderr, "Creating chart\n");
RRDSET *st = rrdset_create_localhost("chart", "ID", NULL, "family", "context", "Unit Testing", "a value", "unittest", NULL, 1, 1, RRDSET_TYPE_LINE);
fprintf(stderr, "Created chart with id '%s', name '%s'\n", st->id, st->name);
@@ -1326,6 +1328,7 @@ int check_strdupz_path_subpath() {
int run_all_mockup_tests(void)
{
+ fprintf(stderr, "%s() running...\n", __FUNCTION__ );
if(check_strdupz_path_subpath())
return 1;
@@ -1399,6 +1402,7 @@ int run_all_mockup_tests(void)
int unit_test(long delay, long shift)
{
+ fprintf(stderr, "%s() running...\n", __FUNCTION__ );
static int repeat = 0;
repeat++;
@@ -1501,6 +1505,7 @@ int unit_test(long delay, long shift)
}
int test_sqlite(void) {
+ fprintf(stderr, "%s() running...\n", __FUNCTION__ );
sqlite3 *db_meta;
fprintf(stderr, "Testing SQLIte\n");
@@ -1590,6 +1595,7 @@ static const int QUERY_BATCH = 4096;
static void test_dbengine_create_charts(RRDHOST *host, RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DIMS],
int update_every)
{
+ fprintf(stderr, "%s() running...\n", __FUNCTION__ );
int i, j;
char name[101];
@@ -1637,6 +1643,7 @@ static void test_dbengine_create_charts(RRDHOST *host, RRDSET *st[CHARTS], RRDDI
static time_t test_dbengine_create_metrics(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DIMS],
int current_region, time_t time_start)
{
+ fprintf(stderr, "%s() running...\n", __FUNCTION__ );
time_t time_now;
int i, j, c, update_every;
collected_number next;
@@ -1672,13 +1679,15 @@ static time_t test_dbengine_create_metrics(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS
static int test_dbengine_check_metrics(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DIMS],
int current_region, time_t time_start)
{
+ fprintf(stderr, "%s() running...\n", __FUNCTION__ );
uint8_t same;
time_t time_now, time_retrieved;
int i, j, k, c, errors, update_every;
collected_number last;
calculated_number value, expected;
- storage_number n;
+ SN_FLAGS nflags;
struct rrddim_query_handle handle;
+ size_t value_errors = 0, time_errors = 0;
update_every = REGION_UPDATE_EVERY[current_region];
errors = 0;
@@ -1694,19 +1703,23 @@ static int test_dbengine_check_metrics(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DI
j * REGION_POINTS[current_region] + c + k;
expected = unpack_storage_number(pack_storage_number((calculated_number)last, SN_DEFAULT_FLAGS));
- n = rd[i][j]->state->query_ops.next_metric(&handle, &time_retrieved);
- value = unpack_storage_number(n);
+ time_t end_time;
+ value = rd[i][j]->state->query_ops.next_metric(&handle, &time_retrieved, &end_time, &nflags);
same = (calculated_number_round(value) == calculated_number_round(expected)) ? 1 : 0;
if(!same) {
- fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, expecting value "
+ if(!value_errors)
+ fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, expecting value "
CALCULATED_NUMBER_FORMAT ", found " CALCULATED_NUMBER_FORMAT ", ### E R R O R ###\n",
st[i]->name, rd[i][j]->name, (unsigned long)time_now + k * update_every, expected, value);
+ value_errors++;
errors++;
}
- if(time_retrieved != time_now + k * update_every) {
- fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, found timestamp %lu ### E R R O R ###\n",
+ if(end_time != time_now + k * update_every) {
+ if(!time_errors)
+ fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, found timestamp %lu ### E R R O R ###\n",
st[i]->name, rd[i][j]->name, (unsigned long)time_now + k * update_every, (unsigned long)time_retrieved);
+ time_errors++;
errors++;
}
}
@@ -1714,6 +1727,13 @@ static int test_dbengine_check_metrics(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DI
}
}
}
+
+ if(value_errors)
+ fprintf(stderr, "%zu value errors encountered\n", value_errors);
+
+ if(time_errors)
+ fprintf(stderr, "%zu time errors encountered\n", time_errors);
+
return errors;
}
@@ -1721,27 +1741,28 @@ static int test_dbengine_check_metrics(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DI
static int test_dbengine_check_rrdr(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DIMS],
int current_region, time_t time_start, time_t time_end)
{
+ int update_every = REGION_UPDATE_EVERY[current_region];
+ fprintf(stderr, "%s() running on region %d, start time %ld, end time %ld, update every %d...\n", __FUNCTION__, current_region, time_start, time_end, update_every);
uint8_t same;
time_t time_now, time_retrieved;
- int i, j, errors, update_every;
+ int i, j, errors, value_errors = 0, time_errors = 0;
long c;
collected_number last;
calculated_number value, expected;
errors = 0;
- update_every = REGION_UPDATE_EVERY[current_region];
long points = (time_end - time_start) / update_every;
for (i = 0 ; i < CHARTS ; ++i) {
ONEWAYALLOC *owa = onewayalloc_create(0);
- RRDR *r = rrd2rrdr(owa, st[i], points, time_start + update_every, time_end, RRDR_GROUPING_AVERAGE, 0, 0, NULL, NULL, 0);
+ RRDR *r = rrd2rrdr(owa, st[i], points, time_start, time_end, RRDR_GROUPING_AVERAGE, 0, 0, NULL, NULL, NULL, 0);
if (!r) {
- fprintf(stderr, " DB-engine unittest %s: empty RRDR ### E R R O R ###\n", st[i]->name);
+ fprintf(stderr, " DB-engine unittest %s: empty RRDR on region %d ### E R R O R ###\n", st[i]->name, current_region);
return ++errors;
} else {
assert(r->st == st[i]);
for (c = 0; c != rrdr_rows(r) ; ++c) {
RRDDIM *d;
- time_now = time_start + (c + 2) * update_every;
+ time_now = time_start + (c + 1) * update_every;
time_retrieved = r->t[c];
// for each dimension
@@ -1750,20 +1771,22 @@ static int test_dbengine_check_rrdr(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DIMS]
value = cn[j];
assert(rd[i][j] == d);
- last = i * DIMS * REGION_POINTS[current_region] + j * REGION_POINTS[current_region] + c + 1;
+ last = i * DIMS * REGION_POINTS[current_region] + j * REGION_POINTS[current_region] + c;
expected = unpack_storage_number(pack_storage_number((calculated_number)last, SN_DEFAULT_FLAGS));
same = (calculated_number_round(value) == calculated_number_round(expected)) ? 1 : 0;
if(!same) {
- fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, expecting value "
+ if(value_errors < 10)
+ fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, expecting value "
CALCULATED_NUMBER_FORMAT ", RRDR found " CALCULATED_NUMBER_FORMAT ", ### E R R O R ###\n",
st[i]->name, rd[i][j]->name, (unsigned long)time_now, expected, value);
- errors++;
+ value_errors++;
}
if(time_retrieved != time_now) {
- fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, found RRDR timestamp %lu ### E R R O R ###\n",
+ if(!time_errors)
+ fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, found RRDR timestamp %lu ### E R R O R ###\n",
st[i]->name, rd[i][j]->name, (unsigned long)time_now, (unsigned long)time_retrieved);
- errors++;
+ time_errors++;
}
}
}
@@ -1771,12 +1794,20 @@ static int test_dbengine_check_rrdr(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DIMS]
}
onewayalloc_destroy(owa);
}
- return errors;
+
+ if(value_errors)
+ fprintf(stderr, "%d value errors encountered\n", value_errors);
+
+ if(time_errors)
+ fprintf(stderr, "%d time errors encountered\n", time_errors);
+
+ return errors + value_errors + time_errors;
}
int test_dbengine(void)
{
- int i, j, errors, update_every, current_region;
+ fprintf(stderr, "%s() running...\n", __FUNCTION__ );
+ int i, j, errors, value_errors = 0, time_errors = 0, update_every, current_region;
RRDHOST *host = NULL;
RRDSET *st[CHARTS];
RRDDIM *rd[CHARTS][DIMS];
@@ -1854,7 +1885,7 @@ int test_dbengine(void)
long point_offset = (time_start[current_region] - time_start[0]) / update_every;
for (i = 0 ; i < CHARTS ; ++i) {
ONEWAYALLOC *owa = onewayalloc_create(0);
- RRDR *r = rrd2rrdr(owa, st[i], points, time_start[0] + update_every, time_end[REGIONS - 1], RRDR_GROUPING_AVERAGE, 0, 0, NULL, NULL, 0);
+ RRDR *r = rrd2rrdr(owa, st[i], points, time_start[0] + update_every, time_end[REGIONS - 1], RRDR_GROUPING_AVERAGE, 0, 0, NULL, NULL, NULL, 0);
if (!r) {
fprintf(stderr, " DB-engine unittest %s: empty RRDR ### E R R O R ###\n", st[i]->name);
++errors;
@@ -1879,15 +1910,17 @@ int test_dbengine(void)
uint8_t same = (calculated_number_round(value) == calculated_number_round(expected)) ? 1 : 0;
if(!same) {
- fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, expecting value "
+ if(!value_errors)
+ fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, expecting value "
CALCULATED_NUMBER_FORMAT ", RRDR found " CALCULATED_NUMBER_FORMAT ", ### E R R O R ###\n",
st[i]->name, rd[i][j]->name, (unsigned long)time_now, expected, value);
- errors++;
+ value_errors++;
}
if(time_retrieved != time_now) {
- fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, found RRDR timestamp %lu ### E R R O R ###\n",
+ if(!time_errors)
+ fprintf(stderr, " DB-engine unittest %s/%s: at %lu secs, found RRDR timestamp %lu ### E R R O R ###\n",
st[i]->name, rd[i][j]->name, (unsigned long)time_now, (unsigned long)time_retrieved);
- errors++;
+ time_errors++;
}
}
}
@@ -1902,7 +1935,7 @@ error_out:
rrdeng_exit(host->rrdeng_ctx);
rrd_unlock();
- return errors;
+ return errors + value_errors + time_errors;
}
struct dbengine_chart_thread {
@@ -1937,6 +1970,7 @@ collected_number generate_dbengine_chart_value(int chart_i, int dim_i, time_t ti
static void generate_dbengine_chart(void *arg)
{
+ fprintf(stderr, "%s() running...\n", __FUNCTION__ );
struct dbengine_chart_thread *thread_info = (struct dbengine_chart_thread *)arg;
RRDHOST *host = thread_info->host;
char *chartname = thread_info->chartname;
@@ -1989,6 +2023,7 @@ static void generate_dbengine_chart(void *arg)
void generate_dbengine_dataset(unsigned history_seconds)
{
+ fprintf(stderr, "%s() running...\n", __FUNCTION__ );
const int DSET_CHARTS = 16;
const int DSET_DIMS = 128;
const uint64_t EXPECTED_COMPRESSION_RATIO = 20;
@@ -2063,6 +2098,7 @@ struct dbengine_query_thread {
static void query_dbengine_chart(void *arg)
{
+ fprintf(stderr, "%s() running...\n", __FUNCTION__ );
struct dbengine_query_thread *thread_info = (struct dbengine_query_thread *)arg;
const int DSET_CHARTS = thread_info->dset_charts;
const int DSET_DIMS = thread_info->dset_dims;
@@ -2074,8 +2110,9 @@ static void query_dbengine_chart(void *arg)
time_t time_now, time_retrieved;
collected_number generatedv;
calculated_number value, expected;
- storage_number n;
+ SN_FLAGS nflags;
struct rrddim_query_handle handle;
+ size_t value_errors = 0, time_errors = 0;
do {
// pick a chart and dimension
@@ -2116,8 +2153,9 @@ static void query_dbengine_chart(void *arg)
}
break;
}
- n = rd->state->query_ops.next_metric(&handle, &time_retrieved);
- if (SN_EMPTY_SLOT == n) {
+ time_t end_time;
+ value = rd->state->query_ops.next_metric(&handle, &time_retrieved, &end_time, &nflags);
+ if (!calculated_number_isnumber(value)) {
if (!thread_info->delete_old_data) { /* data validation only when we don't delete */
fprintf(stderr, " DB-engine stresstest %s/%s: at %lu secs, expecting value "
CALCULATED_NUMBER_FORMAT ", found data gap, ### E R R O R ###\n",
@@ -2127,34 +2165,44 @@ static void query_dbengine_chart(void *arg)
break;
}
++thread_info->queried_metrics_nr;
- value = unpack_storage_number(n);
same = (calculated_number_round(value) == calculated_number_round(expected)) ? 1 : 0;
if (!same) {
if (!thread_info->delete_old_data) { /* data validation only when we don't delete */
- fprintf(stderr, " DB-engine stresstest %s/%s: at %lu secs, expecting value "
+ if(!value_errors)
+ fprintf(stderr, " DB-engine stresstest %s/%s: at %lu secs, expecting value "
CALCULATED_NUMBER_FORMAT ", found " CALCULATED_NUMBER_FORMAT
", ### E R R O R ###\n",
st->name, rd->name, (unsigned long) time_now, expected, value);
- ++thread_info->errors;
+ value_errors++;
+ thread_info->errors++;
}
}
- if (time_retrieved != time_now) {
+ if (end_time != time_now) {
if (!thread_info->delete_old_data) { /* data validation only when we don't delete */
- fprintf(stderr,
+ if(!time_errors)
+ fprintf(stderr,
" DB-engine stresstest %s/%s: at %lu secs, found timestamp %lu ### E R R O R ###\n",
st->name, rd->name, (unsigned long) time_now, (unsigned long) time_retrieved);
- ++thread_info->errors;
+ time_errors++;
+ thread_info->errors++;
}
}
}
rd->state->query_ops.finalize(&handle);
} while(!thread_info->done);
+
+ if(value_errors)
+ fprintf(stderr, "%zu value errors encountered\n", value_errors);
+
+ if(time_errors)
+ fprintf(stderr, "%zu time errors encountered\n", time_errors);
}
void dbengine_stress_test(unsigned TEST_DURATION_SEC, unsigned DSET_CHARTS, unsigned QUERY_THREADS,
unsigned RAMP_UP_SECONDS, unsigned PAGE_CACHE_MB, unsigned DISK_SPACE_MB)
{
+ fprintf(stderr, "%s() running...\n", __FUNCTION__ );
const unsigned DSET_DIMS = 128;
const uint64_t EXPECTED_COMPRESSION_RATIO = 20;
const unsigned HISTORY_SECONDS = 3600 * 24 * 365 * 50; /* 50 year of history */