summaryrefslogtreecommitdiffstats
path: root/daemon
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-10-23 23:46:43 +0300
committerGitHub <noreply@github.com>2022-10-23 23:46:43 +0300
commit00712b351b3c83a54a147ca23365458acbef3105 (patch)
tree9d1614a0ce54195bc9e2d52454f0974eb9f29819 /daemon
parent9798a2b71e880a73b5b95d62d2e0c63dbc649a0e (diff)
QUERY_TARGET: new query engine for Netdata Agent (#13697)
* initial implementation of QUERY_TARGET * rrd2rrdr() interface * rrddim_find_best_tier_for_timeframe() ported * added dimension filtering * added db object in query target * rrd2rrdr() ported * working on formatters * working on jsonwrapper * finally, it compiles... * 1st run without crashes * query planer working * cleanup old code * review changes * fix also changing data collection frequency * fix signess * fix rrdlabels and dimension ordering * fixes * remove unused variable * ml should accept NULL response from rrd2rrdr() * number formatting fixes * more number formatting fixes * more number formatting fixes * support mc parallel queries * formatting and cleanup * added rrd2rrdr_legacy() as a simplified interface to run a query * make sure rrdset_find_natural_update_every_for_timeframe() returns a value * make signed comparisons * weights endpoint using rrdcontexts * fix for legacy db modes and cleanup * fix for chart_ids and remove AR chart from weights endpoint * Ignore command if not initialized yet * remove unused members * properly initialize window * code cleanup - rrddim linked list is gone; rrdset rwlock is gone too * reviewed RRDR.internal members * eliminate unnecessary members of QUERY_TARGET * more complete query ids; more detailed information on aborted queries * properly terminate option strings * query id contains group_options which is controlled by users, so escaping is necessary * tense in query id * tense in query id - again * added the remaining query options to the query id * Expose hidden option to the dimension * use the hidden flag when loading context dimensions * Specify table alias for option * dont update chart last access time, unless at least a dimension of the chart will be queried Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'daemon')
-rw-r--r--daemon/global_statistics.c11
-rw-r--r--daemon/main.c4
-rw-r--r--daemon/service.c6
-rw-r--r--daemon/unit_test.c58
4 files changed, 40 insertions, 39 deletions
diff --git a/daemon/global_statistics.c b/daemon/global_statistics.c
index 32b885738c..e81fc49d11 100644
--- a/daemon/global_statistics.c
+++ b/daemon/global_statistics.c
@@ -695,13 +695,14 @@ static void dbengine_statistics_charts(void) {
unsigned dbengine_contexts = 0, counted_multihost_db[RRD_STORAGE_TIERS] = { 0 }, i;
rrdhost_foreach_read(host) {
- if (host->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE && !rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED)) {
+ if (!rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED)) {
/* get localhost's DB engine's statistics for each tier */
- for(int tier = 0; tier < storage_tiers ;tier++) {
- if(!host->storage_instance[tier]) continue;
+ for(size_t tier = 0; tier < storage_tiers ;tier++) {
+ if(host->db[tier].mode != RRD_MEMORY_MODE_DBENGINE) continue;
+ if(!host->db[tier].instance) continue;
- if(is_storage_engine_shared(host->storage_instance[tier])) {
+ if(is_storage_engine_shared(host->db[tier].instance)) {
if(counted_multihost_db[tier])
continue;
else
@@ -709,7 +710,7 @@ static void dbengine_statistics_charts(void) {
}
++dbengine_contexts;
- rrdeng_get_37_statistics((struct rrdengine_instance *)host->storage_instance[tier], local_stats_array);
+ rrdeng_get_37_statistics((struct rrdengine_instance *)host->db[tier].instance, local_stats_array);
for (i = 0; i < RRDENG_NR_STATS; ++i) {
/* aggregate statistics across hosts */
stats_array[i] += local_stats_array[i];
diff --git a/daemon/main.c b/daemon/main.c
index ff07003625..41386e76da 100644
--- a/daemon/main.c
+++ b/daemon/main.c
@@ -56,7 +56,7 @@ void netdata_cleanup_and_exit(int ret) {
info("EXIT: freeing database memory...");
#ifdef ENABLE_DBENGINE
if(dbengine_enabled) {
- for (int tier = 0; tier < storage_tiers; tier++)
+ for (size_t tier = 0; tier < storage_tiers; tier++)
rrdeng_prepare_exit(multidb_ctx[tier]);
}
#endif
@@ -65,7 +65,7 @@ void netdata_cleanup_and_exit(int ret) {
metadata_sync_shutdown();
#ifdef ENABLE_DBENGINE
if(dbengine_enabled) {
- for (int tier = 0; tier < storage_tiers; tier++)
+ for (size_t tier = 0; tier < storage_tiers; tier++)
rrdeng_exit(multidb_ctx[tier]);
}
#endif
diff --git a/daemon/service.c b/daemon/service.c
index a45019813a..be1aad8164 100644
--- a/daemon/service.c
+++ b/daemon/service.c
@@ -47,11 +47,11 @@ static void svc_rrddim_obsolete_to_archive(RRDDIM *rd) {
/* only a collector can mark a chart as obsolete, so we must remove the reference */
size_t tiers_available = 0, tiers_said_yes = 0;
- for(int tier = 0; tier < storage_tiers ;tier++) {
+ for(size_t tier = 0; tier < storage_tiers ;tier++) {
if(rd->tiers[tier]) {
tiers_available++;
- if(rd->tiers[tier]->collect_ops.finalize(rd->tiers[tier]->db_collection_handle))
+ if(rd->tiers[tier]->collect_ops->finalize(rd->tiers[tier]->db_collection_handle))
tiers_said_yes++;
rd->tiers[tier]->db_collection_handle = NULL;
@@ -217,7 +217,7 @@ restart_after_removal:
if (rrdhost_option_check(host, RRDHOST_OPTION_DELETE_ORPHAN_HOST)
/* don't delete multi-host DB host files */
- && !(host->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE && is_storage_engine_shared(host->storage_instance[0]))
+ && !(host->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE && is_storage_engine_shared(host->db[0].instance))
) {
worker_is_busy(WORKER_JOB_DELETE_HOST_CHARTS);
rrdhost_delete_charts(host);
diff --git a/daemon/unit_test.c b/daemon/unit_test.c
index 32ebb2996c..1b89aecf7f 100644
--- a/daemon/unit_test.c
+++ b/daemon/unit_test.c
@@ -1818,7 +1818,7 @@ static time_t test_dbengine_create_metrics(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS
// feed it with the test data
for (i = 0 ; i < CHARTS ; ++i) {
for (j = 0 ; j < DIMS ; ++j) {
- rd[i][j]->tiers[0]->collect_ops.change_collection_frequency(rd[i][j]->tiers[0]->db_collection_handle, update_every);
+ rd[i][j]->tiers[0]->collect_ops->change_collection_frequency(rd[i][j]->tiers[0]->db_collection_handle, update_every);
rd[i][j]->last_collected_time.tv_sec =
st[i]->last_collected_time.tv_sec = st[i]->last_updated.tv_sec = time_now;
@@ -1852,7 +1852,7 @@ static int test_dbengine_check_metrics(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DI
int i, j, k, c, errors, update_every;
collected_number last;
NETDATA_DOUBLE value, expected;
- struct rrddim_query_handle handle;
+ struct storage_engine_query_handle handle;
size_t value_errors = 0, time_errors = 0;
update_every = REGION_UPDATE_EVERY[current_region];
@@ -1863,13 +1863,13 @@ static int test_dbengine_check_metrics(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DI
time_now = time_start + (c + 1) * update_every;
for (i = 0 ; i < CHARTS ; ++i) {
for (j = 0; j < DIMS; ++j) {
- rd[i][j]->tiers[0]->query_ops.init(rd[i][j]->tiers[0]->db_metric_handle, &handle, time_now, time_now + QUERY_BATCH * update_every);
+ rd[i][j]->tiers[0]->query_ops->init(rd[i][j]->tiers[0]->db_metric_handle, &handle, time_now, time_now + QUERY_BATCH * update_every);
for (k = 0; k < QUERY_BATCH; ++k) {
last = ((collected_number)i * DIMS) * REGION_POINTS[current_region] +
j * REGION_POINTS[current_region] + c + k;
expected = unpack_storage_number(pack_storage_number((NETDATA_DOUBLE)last, SN_DEFAULT_FLAGS));
- STORAGE_POINT sp = rd[i][j]->tiers[0]->query_ops.next_metric(&handle);
+ STORAGE_POINT sp = rd[i][j]->tiers[0]->query_ops->next_metric(&handle);
value = sp.sum;
time_retrieved = sp.start_time;
end_time = sp.end_time;
@@ -1891,7 +1891,7 @@ static int test_dbengine_check_metrics(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DI
errors++;
}
}
- rd[i][j]->tiers[0]->query_ops.finalize(&handle);
+ rd[i][j]->tiers[0]->query_ops->finalize(&handle);
}
}
}
@@ -1922,23 +1922,22 @@ static int test_dbengine_check_rrdr(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DIMS]
long points = (time_end - time_start) / update_every;
for (i = 0 ; i < CHARTS ; ++i) {
ONEWAYALLOC *owa = onewayalloc_create(0);
- RRDR *r = rrd2rrdr(owa, st[i], points, time_start, time_end,
- RRDR_GROUPING_AVERAGE, 0, RRDR_OPTION_NATURAL_POINTS,
- NULL, NULL, NULL, 0, 0);
-
+ RRDR *r = rrd2rrdr_legacy(owa, st[i], points, time_start, time_end,
+ RRDR_GROUPING_AVERAGE, 0, RRDR_OPTION_NATURAL_POINTS,
+ NULL, NULL, 0, 0);
if (!r) {
fprintf(stderr, " DB-engine unittest %s: empty RRDR on region %d ### E R R O R ###\n", rrdset_name(st[i]), current_region);
return ++errors;
} else {
- assert(r->st == st[i]);
- for (c = 0; c != rrdr_rows(r) ; ++c) {
+ assert(r->internal.qt->request.st == st[i]);
+ for (c = 0; c != (long)rrdr_rows(r) ; ++c) {
RRDDIM *d;
time_now = time_start + (c + 1) * update_every;
time_retrieved = r->t[c];
// for each dimension
- rrddim_foreach_read(d, r->st) {
- if(unlikely((int)d_dfe.counter >= r->d)) break; // d_counter is provided by the dictionary dfe
+ rrddim_foreach_read(d, r->internal.qt->request.st) {
+ if(unlikely(d_dfe.counter >= r->d)) break; // d_counter is provided by the dictionary dfe
j = (int)d_dfe.counter;
@@ -2061,25 +2060,26 @@ int test_dbengine(void)
long point_offset = (time_start[current_region] - time_start[0]) / update_every;
for (i = 0 ; i < CHARTS ; ++i) {
ONEWAYALLOC *owa = onewayalloc_create(0);
- RRDR *r = rrd2rrdr(owa, st[i], points, time_start[0] + update_every,
- time_end[REGIONS - 1], RRDR_GROUPING_AVERAGE, 0,
- RRDR_OPTION_NATURAL_POINTS, NULL, NULL, NULL, 0, 0);
+ RRDR *r = rrd2rrdr_legacy(owa, st[i], points, time_start[0] + update_every,
+ time_end[REGIONS - 1], RRDR_GROUPING_AVERAGE, 0,
+ RRDR_OPTION_NATURAL_POINTS, NULL, NULL, 0, 0);
+
if (!r) {
fprintf(stderr, " DB-engine unittest %s: empty RRDR ### E R R O R ###\n", rrdset_name(st[i]));
++errors;
} else {
long c;
- assert(r->st == st[i]);
+ assert(r->internal.qt->request.st == st[i]);
// test current region values only, since they must be left unchanged
- for (c = point_offset ; c < point_offset + rrdr_rows(r) / REGIONS / 2 ; ++c) {
+ for (c = point_offset ; c < (long)(point_offset + rrdr_rows(r) / REGIONS / 2) ; ++c) {
RRDDIM *d;
time_t time_now = time_start[current_region] + (c - point_offset + 2) * update_every;
time_t time_retrieved = r->t[c];
// for each dimension
- rrddim_foreach_read(d, r->st) {
- if(unlikely((int)d_dfe.counter >= r->d)) break; // d_counter is provided by the dictionary dfe
+ rrddim_foreach_read(d, r->internal.qt->request.st) {
+ if(unlikely(d_dfe.counter >= r->d)) break; // d_counter is provided by the dictionary dfe
j = (int)d_dfe.counter;
@@ -2113,9 +2113,9 @@ int test_dbengine(void)
}
error_out:
rrd_wrlock();
- rrdeng_prepare_exit((struct rrdengine_instance *)host->storage_instance[0]);
+ rrdeng_prepare_exit((struct rrdengine_instance *)host->db[0].instance);
rrdhost_delete_charts(host);
- rrdeng_exit((struct rrdengine_instance *)host->storage_instance[0]);
+ rrdeng_exit((struct rrdengine_instance *)host->db[0].instance);
rrd_unlock();
return errors + value_errors + time_errors;
@@ -2293,7 +2293,7 @@ static void query_dbengine_chart(void *arg)
time_t time_now, time_retrieved, end_time;
collected_number generatedv;
NETDATA_DOUBLE value, expected;
- struct rrddim_query_handle handle;
+ struct storage_engine_query_handle handle;
size_t value_errors = 0, time_errors = 0;
do {
@@ -2320,13 +2320,13 @@ static void query_dbengine_chart(void *arg)
time_before = MIN(time_after + duration, time_max); /* up to 1 hour queries */
}
- rd->tiers[0]->query_ops.init(rd->tiers[0]->db_metric_handle, &handle, time_after, time_before);
+ rd->tiers[0]->query_ops->init(rd->tiers[0]->db_metric_handle, &handle, time_after, time_before);
++thread_info->queries_nr;
for (time_now = time_after ; time_now <= time_before ; time_now += update_every) {
generatedv = generate_dbengine_chart_value(i, j, time_now);
expected = unpack_storage_number(pack_storage_number((NETDATA_DOUBLE) generatedv, SN_DEFAULT_FLAGS));
- if (unlikely(rd->tiers[0]->query_ops.is_finished(&handle))) {
+ if (unlikely(rd->tiers[0]->query_ops->is_finished(&handle))) {
if (!thread_info->delete_old_data) { /* data validation only when we don't delete */
fprintf(stderr, " DB-engine stresstest %s/%s: at %lu secs, expecting value " NETDATA_DOUBLE_FORMAT
", found data gap, ### E R R O R ###\n",
@@ -2336,7 +2336,7 @@ static void query_dbengine_chart(void *arg)
break;
}
- STORAGE_POINT sp = rd->tiers[0]->query_ops.next_metric(&handle);
+ STORAGE_POINT sp = rd->tiers[0]->query_ops->next_metric(&handle);
value = sp.sum;
time_retrieved = sp.start_time;
end_time = sp.end_time;
@@ -2374,7 +2374,7 @@ static void query_dbengine_chart(void *arg)
}
}
}
- rd->tiers[0]->query_ops.finalize(&handle);
+ rd->tiers[0]->query_ops->finalize(&handle);
} while(!thread_info->done);
if(value_errors)
@@ -2522,9 +2522,9 @@ void dbengine_stress_test(unsigned TEST_DURATION_SEC, unsigned DSET_CHARTS, unsi
}
freez(query_threads);
rrd_wrlock();
- rrdeng_prepare_exit((struct rrdengine_instance *)host->storage_instance[0]);
+ rrdeng_prepare_exit((struct rrdengine_instance *)host->db[0].instance);
rrdhost_delete_charts(host);
- rrdeng_exit((struct rrdengine_instance *)host->storage_instance[0]);
+ rrdeng_exit((struct rrdengine_instance *)host->db[0].instance);
rrd_unlock();
}