diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2022-10-23 23:46:43 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-10-23 23:46:43 +0300 |
commit | 00712b351b3c83a54a147ca23365458acbef3105 (patch) | |
tree | 9d1614a0ce54195bc9e2d52454f0974eb9f29819 /daemon | |
parent | 9798a2b71e880a73b5b95d62d2e0c63dbc649a0e (diff) |
QUERY_TARGET: new query engine for Netdata Agent (#13697)
* initial implementation of QUERY_TARGET
* rrd2rrdr() interface
* rrddim_find_best_tier_for_timeframe() ported
* added dimension filtering
* added db object in query target
* rrd2rrdr() ported
* working on formatters
* working on jsonwrapper
* finally, it compiles...
* 1st run without crashes
* query planer working
* cleanup old code
* review changes
* fix also changing data collection frequency
* fix signess
* fix rrdlabels and dimension ordering
* fixes
* remove unused variable
* ml should accept NULL response from rrd2rrdr()
* number formatting fixes
* more number formatting fixes
* more number formatting fixes
* support mc parallel queries
* formatting and cleanup
* added rrd2rrdr_legacy() as a simplified interface to run a query
* make sure rrdset_find_natural_update_every_for_timeframe() returns a value
* make signed comparisons
* weights endpoint using rrdcontexts
* fix for legacy db modes and cleanup
* fix for chart_ids and remove AR chart from weights endpoint
* Ignore command if not initialized yet
* remove unused members
* properly initialize window
* code cleanup - rrddim linked list is gone; rrdset rwlock is gone too
* reviewed RRDR.internal members
* eliminate unnecessary members of QUERY_TARGET
* more complete query ids; more detailed information on aborted queries
* properly terminate option strings
* query id contains group_options which is controlled by users, so escaping is necessary
* tense in query id
* tense in query id - again
* added the remaining query options to the query id
* Expose hidden option to the dimension
* use the hidden flag when loading context dimensions
* Specify table alias for option
* dont update chart last access time, unless at least a dimension of the chart will be queried
Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'daemon')
-rw-r--r-- | daemon/global_statistics.c | 11 | ||||
-rw-r--r-- | daemon/main.c | 4 | ||||
-rw-r--r-- | daemon/service.c | 6 | ||||
-rw-r--r-- | daemon/unit_test.c | 58 |
4 files changed, 40 insertions, 39 deletions
diff --git a/daemon/global_statistics.c b/daemon/global_statistics.c index 32b885738c..e81fc49d11 100644 --- a/daemon/global_statistics.c +++ b/daemon/global_statistics.c @@ -695,13 +695,14 @@ static void dbengine_statistics_charts(void) { unsigned dbengine_contexts = 0, counted_multihost_db[RRD_STORAGE_TIERS] = { 0 }, i; rrdhost_foreach_read(host) { - if (host->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE && !rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED)) { + if (!rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED)) { /* get localhost's DB engine's statistics for each tier */ - for(int tier = 0; tier < storage_tiers ;tier++) { - if(!host->storage_instance[tier]) continue; + for(size_t tier = 0; tier < storage_tiers ;tier++) { + if(host->db[tier].mode != RRD_MEMORY_MODE_DBENGINE) continue; + if(!host->db[tier].instance) continue; - if(is_storage_engine_shared(host->storage_instance[tier])) { + if(is_storage_engine_shared(host->db[tier].instance)) { if(counted_multihost_db[tier]) continue; else @@ -709,7 +710,7 @@ static void dbengine_statistics_charts(void) { } ++dbengine_contexts; - rrdeng_get_37_statistics((struct rrdengine_instance *)host->storage_instance[tier], local_stats_array); + rrdeng_get_37_statistics((struct rrdengine_instance *)host->db[tier].instance, local_stats_array); for (i = 0; i < RRDENG_NR_STATS; ++i) { /* aggregate statistics across hosts */ stats_array[i] += local_stats_array[i]; diff --git a/daemon/main.c b/daemon/main.c index ff07003625..41386e76da 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -56,7 +56,7 @@ void netdata_cleanup_and_exit(int ret) { info("EXIT: freeing database memory..."); #ifdef ENABLE_DBENGINE if(dbengine_enabled) { - for (int tier = 0; tier < storage_tiers; tier++) + for (size_t tier = 0; tier < storage_tiers; tier++) rrdeng_prepare_exit(multidb_ctx[tier]); } #endif @@ -65,7 +65,7 @@ void netdata_cleanup_and_exit(int ret) { metadata_sync_shutdown(); #ifdef ENABLE_DBENGINE if(dbengine_enabled) { - for (int tier = 0; tier < storage_tiers; tier++) + for (size_t tier = 0; tier < storage_tiers; tier++) rrdeng_exit(multidb_ctx[tier]); } #endif diff --git a/daemon/service.c b/daemon/service.c index a45019813a..be1aad8164 100644 --- a/daemon/service.c +++ b/daemon/service.c @@ -47,11 +47,11 @@ static void svc_rrddim_obsolete_to_archive(RRDDIM *rd) { /* only a collector can mark a chart as obsolete, so we must remove the reference */ size_t tiers_available = 0, tiers_said_yes = 0; - for(int tier = 0; tier < storage_tiers ;tier++) { + for(size_t tier = 0; tier < storage_tiers ;tier++) { if(rd->tiers[tier]) { tiers_available++; - if(rd->tiers[tier]->collect_ops.finalize(rd->tiers[tier]->db_collection_handle)) + if(rd->tiers[tier]->collect_ops->finalize(rd->tiers[tier]->db_collection_handle)) tiers_said_yes++; rd->tiers[tier]->db_collection_handle = NULL; @@ -217,7 +217,7 @@ restart_after_removal: if (rrdhost_option_check(host, RRDHOST_OPTION_DELETE_ORPHAN_HOST) /* don't delete multi-host DB host files */ - && !(host->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE && is_storage_engine_shared(host->storage_instance[0])) + && !(host->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE && is_storage_engine_shared(host->db[0].instance)) ) { worker_is_busy(WORKER_JOB_DELETE_HOST_CHARTS); rrdhost_delete_charts(host); diff --git a/daemon/unit_test.c b/daemon/unit_test.c index 32ebb2996c..1b89aecf7f 100644 --- a/daemon/unit_test.c +++ b/daemon/unit_test.c @@ -1818,7 +1818,7 @@ static time_t test_dbengine_create_metrics(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS // feed it with the test data for (i = 0 ; i < CHARTS ; ++i) { for (j = 0 ; j < DIMS ; ++j) { - rd[i][j]->tiers[0]->collect_ops.change_collection_frequency(rd[i][j]->tiers[0]->db_collection_handle, update_every); + rd[i][j]->tiers[0]->collect_ops->change_collection_frequency(rd[i][j]->tiers[0]->db_collection_handle, update_every); rd[i][j]->last_collected_time.tv_sec = st[i]->last_collected_time.tv_sec = st[i]->last_updated.tv_sec = time_now; @@ -1852,7 +1852,7 @@ static int test_dbengine_check_metrics(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DI int i, j, k, c, errors, update_every; collected_number last; NETDATA_DOUBLE value, expected; - struct rrddim_query_handle handle; + struct storage_engine_query_handle handle; size_t value_errors = 0, time_errors = 0; update_every = REGION_UPDATE_EVERY[current_region]; @@ -1863,13 +1863,13 @@ static int test_dbengine_check_metrics(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DI time_now = time_start + (c + 1) * update_every; for (i = 0 ; i < CHARTS ; ++i) { for (j = 0; j < DIMS; ++j) { - rd[i][j]->tiers[0]->query_ops.init(rd[i][j]->tiers[0]->db_metric_handle, &handle, time_now, time_now + QUERY_BATCH * update_every); + rd[i][j]->tiers[0]->query_ops->init(rd[i][j]->tiers[0]->db_metric_handle, &handle, time_now, time_now + QUERY_BATCH * update_every); for (k = 0; k < QUERY_BATCH; ++k) { last = ((collected_number)i * DIMS) * REGION_POINTS[current_region] + j * REGION_POINTS[current_region] + c + k; expected = unpack_storage_number(pack_storage_number((NETDATA_DOUBLE)last, SN_DEFAULT_FLAGS)); - STORAGE_POINT sp = rd[i][j]->tiers[0]->query_ops.next_metric(&handle); + STORAGE_POINT sp = rd[i][j]->tiers[0]->query_ops->next_metric(&handle); value = sp.sum; time_retrieved = sp.start_time; end_time = sp.end_time; @@ -1891,7 +1891,7 @@ static int test_dbengine_check_metrics(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DI errors++; } } - rd[i][j]->tiers[0]->query_ops.finalize(&handle); + rd[i][j]->tiers[0]->query_ops->finalize(&handle); } } } @@ -1922,23 +1922,22 @@ static int test_dbengine_check_rrdr(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DIMS] long points = (time_end - time_start) / update_every; for (i = 0 ; i < CHARTS ; ++i) { ONEWAYALLOC *owa = onewayalloc_create(0); - RRDR *r = rrd2rrdr(owa, st[i], points, time_start, time_end, - RRDR_GROUPING_AVERAGE, 0, RRDR_OPTION_NATURAL_POINTS, - NULL, NULL, NULL, 0, 0); - + RRDR *r = rrd2rrdr_legacy(owa, st[i], points, time_start, time_end, + RRDR_GROUPING_AVERAGE, 0, RRDR_OPTION_NATURAL_POINTS, + NULL, NULL, 0, 0); if (!r) { fprintf(stderr, " DB-engine unittest %s: empty RRDR on region %d ### E R R O R ###\n", rrdset_name(st[i]), current_region); return ++errors; } else { - assert(r->st == st[i]); - for (c = 0; c != rrdr_rows(r) ; ++c) { + assert(r->internal.qt->request.st == st[i]); + for (c = 0; c != (long)rrdr_rows(r) ; ++c) { RRDDIM *d; time_now = time_start + (c + 1) * update_every; time_retrieved = r->t[c]; // for each dimension - rrddim_foreach_read(d, r->st) { - if(unlikely((int)d_dfe.counter >= r->d)) break; // d_counter is provided by the dictionary dfe + rrddim_foreach_read(d, r->internal.qt->request.st) { + if(unlikely(d_dfe.counter >= r->d)) break; // d_counter is provided by the dictionary dfe j = (int)d_dfe.counter; @@ -2061,25 +2060,26 @@ int test_dbengine(void) long point_offset = (time_start[current_region] - time_start[0]) / update_every; for (i = 0 ; i < CHARTS ; ++i) { ONEWAYALLOC *owa = onewayalloc_create(0); - RRDR *r = rrd2rrdr(owa, st[i], points, time_start[0] + update_every, - time_end[REGIONS - 1], RRDR_GROUPING_AVERAGE, 0, - RRDR_OPTION_NATURAL_POINTS, NULL, NULL, NULL, 0, 0); + RRDR *r = rrd2rrdr_legacy(owa, st[i], points, time_start[0] + update_every, + time_end[REGIONS - 1], RRDR_GROUPING_AVERAGE, 0, + RRDR_OPTION_NATURAL_POINTS, NULL, NULL, 0, 0); + if (!r) { fprintf(stderr, " DB-engine unittest %s: empty RRDR ### E R R O R ###\n", rrdset_name(st[i])); ++errors; } else { long c; - assert(r->st == st[i]); + assert(r->internal.qt->request.st == st[i]); // test current region values only, since they must be left unchanged - for (c = point_offset ; c < point_offset + rrdr_rows(r) / REGIONS / 2 ; ++c) { + for (c = point_offset ; c < (long)(point_offset + rrdr_rows(r) / REGIONS / 2) ; ++c) { RRDDIM *d; time_t time_now = time_start[current_region] + (c - point_offset + 2) * update_every; time_t time_retrieved = r->t[c]; // for each dimension - rrddim_foreach_read(d, r->st) { - if(unlikely((int)d_dfe.counter >= r->d)) break; // d_counter is provided by the dictionary dfe + rrddim_foreach_read(d, r->internal.qt->request.st) { + if(unlikely(d_dfe.counter >= r->d)) break; // d_counter is provided by the dictionary dfe j = (int)d_dfe.counter; @@ -2113,9 +2113,9 @@ int test_dbengine(void) } error_out: rrd_wrlock(); - rrdeng_prepare_exit((struct rrdengine_instance *)host->storage_instance[0]); + rrdeng_prepare_exit((struct rrdengine_instance *)host->db[0].instance); rrdhost_delete_charts(host); - rrdeng_exit((struct rrdengine_instance *)host->storage_instance[0]); + rrdeng_exit((struct rrdengine_instance *)host->db[0].instance); rrd_unlock(); return errors + value_errors + time_errors; @@ -2293,7 +2293,7 @@ static void query_dbengine_chart(void *arg) time_t time_now, time_retrieved, end_time; collected_number generatedv; NETDATA_DOUBLE value, expected; - struct rrddim_query_handle handle; + struct storage_engine_query_handle handle; size_t value_errors = 0, time_errors = 0; do { @@ -2320,13 +2320,13 @@ static void query_dbengine_chart(void *arg) time_before = MIN(time_after + duration, time_max); /* up to 1 hour queries */ } - rd->tiers[0]->query_ops.init(rd->tiers[0]->db_metric_handle, &handle, time_after, time_before); + rd->tiers[0]->query_ops->init(rd->tiers[0]->db_metric_handle, &handle, time_after, time_before); ++thread_info->queries_nr; for (time_now = time_after ; time_now <= time_before ; time_now += update_every) { generatedv = generate_dbengine_chart_value(i, j, time_now); expected = unpack_storage_number(pack_storage_number((NETDATA_DOUBLE) generatedv, SN_DEFAULT_FLAGS)); - if (unlikely(rd->tiers[0]->query_ops.is_finished(&handle))) { + if (unlikely(rd->tiers[0]->query_ops->is_finished(&handle))) { if (!thread_info->delete_old_data) { /* data validation only when we don't delete */ fprintf(stderr, " DB-engine stresstest %s/%s: at %lu secs, expecting value " NETDATA_DOUBLE_FORMAT ", found data gap, ### E R R O R ###\n", @@ -2336,7 +2336,7 @@ static void query_dbengine_chart(void *arg) break; } - STORAGE_POINT sp = rd->tiers[0]->query_ops.next_metric(&handle); + STORAGE_POINT sp = rd->tiers[0]->query_ops->next_metric(&handle); value = sp.sum; time_retrieved = sp.start_time; end_time = sp.end_time; @@ -2374,7 +2374,7 @@ static void query_dbengine_chart(void *arg) } } } - rd->tiers[0]->query_ops.finalize(&handle); + rd->tiers[0]->query_ops->finalize(&handle); } while(!thread_info->done); if(value_errors) @@ -2522,9 +2522,9 @@ void dbengine_stress_test(unsigned TEST_DURATION_SEC, unsigned DSET_CHARTS, unsi } freez(query_threads); rrd_wrlock(); - rrdeng_prepare_exit((struct rrdengine_instance *)host->storage_instance[0]); + rrdeng_prepare_exit((struct rrdengine_instance *)host->db[0].instance); rrdhost_delete_charts(host); - rrdeng_exit((struct rrdengine_instance *)host->storage_instance[0]); + rrdeng_exit((struct rrdengine_instance *)host->db[0].instance); rrd_unlock(); } |