From 204dd9ae272445d13f308badb07e99675fa34892 Mon Sep 17 00:00:00 2001 From: Costa Tsaousis Date: Fri, 7 Apr 2023 21:25:01 +0300 Subject: Boost dbengine (#14832) * configure extent cache size * workers can now execute up to 10 jobs in a run, boosting query prep and extent reads * fix dispatched and executing counters * boost to the max * increase libuv worker threads * query prep always get more prio than extent reads; stop processing in batch when dbengine is queue is critical * fix accounting of query prep * inlining of time-grouping functions, to speed up queries with billions of points * make switching based on a local const variable * print one pending contexts loading message per iteration * inlined store engine query API * inlined storage engine data collection api * inlined all storage engine query ops * eliminate and inline data collection ops * simplified query group-by * more error handling * optimized partial trimming of group-by queries * preparative work to support multiple passes of group-by * more preparative work to support multiple passes of group-by (accepts multiple group-by params) * unified query timings * unified query timings - weights endpoint * query target is no longer a static thread variable - there is a list of cached query targets, each of which of freed every 1000 queries * fix query memory accounting * added summary.dimension[].pri and sorted summary.dimensions based on priority and then name * limit max ACLK WEB response size to 30MB * the response type should be text/plain * more preparative work for multiple group-by passes * create functions for generating group by keys, ids and names * multiple group-by passes are now supported * parse group-by options array also with an index * implemented percentage-of-instance group by function * family is now merged in multi-node contexts * prevent uninitialized use --- aclk/aclk_query.c | 8 + daemon/main.c | 2 +- daemon/service.c | 2 +- daemon/unit_test.c | 16 +- database/contexts/api_v2.c | 70 +- database/contexts/query_target.c | 201 ++- database/contexts/rrdcontext.h | 77 +- database/engine/pagecache.c | 10 +- database/engine/pagecache.h | 2 +- database/engine/pdc.c | 6 +- database/engine/rrdengine.c | 160 +- database/engine/rrdengine.h | 9 +- database/engine/rrdengineapi.c | 3 + database/ram/rrddim_mem.c | 6 +- database/ram/rrddim_mem.h | 4 +- database/rrd.h | 258 +++- database/rrddim.c | 14 +- database/rrdset.c | 23 +- database/sqlite/sqlite_aclk_node.c | 7 +- database/storage_engine.c | 54 +- exporting/process_data.c | 10 +- ml/ml.cc | 13 +- streaming/replication.c | 15 +- web/api/formatters/json_wrapper.c | 208 ++- web/api/formatters/rrd2json.c | 8 - web/api/formatters/rrd2json.h | 4 - web/api/formatters/value/value.c | 4 +- web/api/netdata-swagger.yaml | 1 + web/api/queries/average/average.c | 52 - web/api/queries/average/average.h | 57 +- web/api/queries/countif/countif.c | 129 -- web/api/queries/countif/countif.h | 143 +- web/api/queries/des/des.c | 129 -- web/api/queries/des/des.h | 133 +- web/api/queries/incremental_sum/incremental_sum.c | 59 - web/api/queries/incremental_sum/incremental_sum.h | 64 +- web/api/queries/max/max.c | 50 - web/api/queries/max/max.h | 54 +- web/api/queries/median/median.c | 134 -- web/api/queries/median/median.h | 146 +- web/api/queries/min/min.c | 50 - web/api/queries/min/min.h | 54 +- web/api/queries/percentile/percentile.c | 163 -- web/api/queries/percentile/percentile.h | 175 ++- web/api/queries/query.c | 1688 ++++++++++++--------- web/api/queries/query.h | 24 +- web/api/queries/rrdr.c | 4 +- web/api/queries/rrdr.h | 17 +- web/api/queries/ses/ses.c | 82 - web/api/queries/ses/ses.h | 87 +- web/api/queries/stddev/stddev.c | 112 -- web/api/queries/stddev/stddev.h | 118 +- web/api/queries/sum/sum.c | 46 - web/api/queries/sum/sum.h | 51 +- web/api/queries/trimmed_mean/trimmed_mean.c | 159 -- web/api/queries/trimmed_mean/trimmed_mean.h | 171 ++- web/api/queries/weights.c | 97 +- web/api/web_api_v2.c | 85 +- web/server/web_client.h | 1 + 59 files changed, 3112 insertions(+), 2417 deletions(-) diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c index 6ad2c43b70..1d00cdca3e 100644 --- a/aclk/aclk_query.c +++ b/aclk/aclk_query.c @@ -5,6 +5,7 @@ #include "aclk_tx_msgs.h" #define WEB_HDR_ACCEPT_ENC "Accept-Encoding:" +#define ACLK_MAX_WEB_RESPONSE_SIZE (30 * 1024 * 1024) pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER; pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER; @@ -22,6 +23,13 @@ static usec_t aclk_web_api_request(RRDHOST *host, struct web_client *w, char *ur else w->response.code = web_client_api_request_v1(host, w, url); + if(buffer_strlen(w->response.data) > ACLK_MAX_WEB_RESPONSE_SIZE) { + buffer_flush(w->response.data); + buffer_strcat(w->response.data, "response is too big"); + w->response.data->content_type = CT_TEXT_PLAIN; + w->response.code = HTTP_RESP_CONTENT_TOO_LONG; + } + t = now_monotonic_high_precision_usec() - t; if (aclk_stats_enabled) { diff --git a/daemon/main.c b/daemon/main.c index 832a59b955..682106b78e 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -1834,7 +1834,7 @@ int main(int argc, char **argv) { #endif // set libuv worker threads - libuv_worker_threads = (int)get_netdata_cpus() * 2; + libuv_worker_threads = (int)get_netdata_cpus() * 6; if(libuv_worker_threads < MIN_LIBUV_WORKER_THREADS) libuv_worker_threads = MIN_LIBUV_WORKER_THREADS; diff --git a/daemon/service.c b/daemon/service.c index 9761abd02d..57c7c7f398 100644 --- a/daemon/service.c +++ b/daemon/service.c @@ -55,7 +55,7 @@ static void svc_rrddim_obsolete_to_archive(RRDDIM *rd) { if(rd->tiers[tier].db_collection_handle) { tiers_available++; - if(rd->tiers[tier].collect_ops->finalize(rd->tiers[tier].db_collection_handle)) + if(storage_engine_store_finalize(rd->tiers[tier].db_collection_handle)) tiers_said_no_retention++; rd->tiers[tier].db_collection_handle = NULL; diff --git a/daemon/unit_test.c b/daemon/unit_test.c index 4dd4b29931..fa3fa847f2 100644 --- a/daemon/unit_test.c +++ b/daemon/unit_test.c @@ -1937,7 +1937,7 @@ static time_t test_dbengine_create_metrics(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS // feed it with the test data for (i = 0 ; i < CHARTS ; ++i) { for (j = 0 ; j < DIMS ; ++j) { - rd[i][j]->tiers[0].collect_ops->change_collection_frequency(rd[i][j]->tiers[0].db_collection_handle, update_every); + storage_engine_store_change_collection_frequency(rd[i][j]->tiers[0].db_collection_handle, update_every); rd[i][j]->last_collected_time.tv_sec = st[i]->last_collected_time.tv_sec = st[i]->last_updated.tv_sec = time_now; @@ -1988,13 +1988,13 @@ static int test_dbengine_check_metrics(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DI time_now = time_start + (c + 1) * update_every; for (i = 0 ; i < CHARTS ; ++i) { for (j = 0; j < DIMS; ++j) { - rd[i][j]->tiers[0].query_ops->init(rd[i][j]->tiers[0].db_metric_handle, &handle, time_now, time_now + QUERY_BATCH * update_every, STORAGE_PRIORITY_NORMAL); + storage_engine_query_init(rd[i][j]->tiers[0].backend, rd[i][j]->tiers[0].db_metric_handle, &handle, time_now, time_now + QUERY_BATCH * update_every, STORAGE_PRIORITY_NORMAL); for (k = 0; k < QUERY_BATCH; ++k) { last = ((collected_number)i * DIMS) * REGION_POINTS[current_region] + j * REGION_POINTS[current_region] + c + k; expected = unpack_storage_number(pack_storage_number((NETDATA_DOUBLE)last, SN_DEFAULT_FLAGS)); - STORAGE_POINT sp = rd[i][j]->tiers[0].query_ops->next_metric(&handle); + STORAGE_POINT sp = storage_engine_query_next_metric(&handle); value = sp.sum; time_retrieved = sp.start_time_s; end_time = sp.end_time_s; @@ -2016,7 +2016,7 @@ static int test_dbengine_check_metrics(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DI errors++; } } - rd[i][j]->tiers[0].query_ops->finalize(&handle); + storage_engine_query_finalize(&handle); } } } @@ -2444,13 +2444,13 @@ static void query_dbengine_chart(void *arg) time_before = MIN(time_after + duration, time_max); /* up to 1 hour queries */ } - rd->tiers[0].query_ops->init(rd->tiers[0].db_metric_handle, &handle, time_after, time_before, STORAGE_PRIORITY_NORMAL); + storage_engine_query_init(rd->tiers[0].backend, rd->tiers[0].db_metric_handle, &handle, time_after, time_before, STORAGE_PRIORITY_NORMAL); ++thread_info->queries_nr; for (time_now = time_after ; time_now <= time_before ; time_now += update_every) { generatedv = generate_dbengine_chart_value(i, j, time_now); expected = unpack_storage_number(pack_storage_number((NETDATA_DOUBLE) generatedv, SN_DEFAULT_FLAGS)); - if (unlikely(rd->tiers[0].query_ops->is_finished(&handle))) { + if (unlikely(storage_engine_query_is_finished(&handle))) { if (!thread_info->delete_old_data) { /* data validation only when we don't delete */ fprintf(stderr, " DB-engine stresstest %s/%s: at %lu secs, expecting value " NETDATA_DOUBLE_FORMAT ", found data gap, ### E R R O R ###\n", @@ -2460,7 +2460,7 @@ static void query_dbengine_chart(void *arg) break; } - STORAGE_POINT sp = rd->tiers[0].query_ops->next_metric(&handle); + STORAGE_POINT sp = storage_engine_query_next_metric(&handle); value = sp.sum; time_retrieved = sp.start_time_s; end_time = sp.end_time_s; @@ -2498,7 +2498,7 @@ static void query_dbengine_chart(void *arg) } } } - rd->tiers[0].query_ops->finalize(&handle); + storage_engine_query_finalize(&handle); } while(!thread_info->done); if(value_errors) diff --git a/database/contexts/api_v2.c b/database/contexts/api_v2.c index 6d5d104994..0d753b8789 100644 --- a/database/contexts/api_v2.c +++ b/database/contexts/api_v2.c @@ -112,6 +112,8 @@ struct rrdcontext_to_json_v2_data { SIMPLE_PATTERN *pattern; FTS_INDEX fts; } q; + + struct query_timings timings; }; static FTS_MATCH rrdcontext_to_json_v2_full_text_search(struct rrdcontext_to_json_v2_data *ctl, RRDCONTEXT *rc, SIMPLE_PATTERN *q) { @@ -194,7 +196,7 @@ static ssize_t rrdcontext_to_json_v2_add_context(void *data, RRDCONTEXT_ACQUIRED struct rrdcontext_to_json_v2_entry t = { .count = 0, .id = rc->id, - .family = rc->family, + .family = string_dup(rc->family), .priority = rc->priority, .first_time_s = rc->first_time_s, .last_time_s = rc->last_time_s, @@ -219,6 +221,10 @@ static ssize_t rrdcontext_to_json_v2_add_context(void *data, RRDCONTEXT_ACQUIRED if(z->last_time_s < rc->last_time_s) z->last_time_s = rc->last_time_s; + + if(z->family != rc->family) { + z->family = string_2way_merge(z->family, rc->family); + } } return 1; @@ -248,7 +254,7 @@ static ssize_t rrdcontext_to_json_v2_add_host(void *data, RRDHOST *host, bool qu struct rrdcontext_to_json_v2_data *ctl = data; BUFFER *wb = ctl->wb; - if(ctl->request->timeout_ms && now_monotonic_usec() > ctl->request->timings.received_ut + ctl->request->timeout_ms * USEC_PER_MS) + if(ctl->request->timeout_ms && now_monotonic_usec() > ctl->timings.received_ut + ctl->request->timeout_ms * USEC_PER_MS) // timed out return -2; @@ -384,7 +390,22 @@ static void buffer_json_contexts_v2_options_to_array(BUFFER *wb, CONTEXTS_V2_OPT buffer_json_add_array_item_string(wb, "search"); } -void buffer_json_agents_array_v2(BUFFER *wb, time_t now_s) { +void buffer_json_query_timings(BUFFER *wb, const char *key, struct query_timings *timings) { + timings->finished_ut = now_monotonic_usec(); + if(!timings->executed_ut) + timings->executed_ut = timings->finished_ut; + if(!timings->preprocessed_ut) + timings->preprocessed_ut = timings->received_ut; + buffer_json_member_add_object(wb, key); + buffer_json_member_add_double(wb, "prep_ms", (NETDATA_DOUBLE)(timings->preprocessed_ut - timings->received_ut) / USEC_PER_MS); + buffer_json_member_add_double(wb, "query_ms", (NETDATA_DOUBLE)(timings->executed_ut - timings->preprocessed_ut) / USEC_PER_MS); + buffer_json_member_add_double(wb, "output_ms", (NETDATA_DOUBLE)(timings->finished_ut - timings->executed_ut) / USEC_PER_MS); + buffer_json_member_add_double(wb, "total_ms", (NETDATA_DOUBLE)(timings->finished_ut - timings->received_ut) / USEC_PER_MS); + buffer_json_member_add_double(wb, "cloud_ms", (NETDATA_DOUBLE)(timings->finished_ut - timings->received_ut) / USEC_PER_MS); + buffer_json_object_close(wb); +} + +void buffer_json_agents_array_v2(BUFFER *wb, struct query_timings *timings, time_t now_s) { if(!now_s) now_s = now_realtime_sec(); @@ -395,15 +416,30 @@ void buffer_json_agents_array_v2(BUFFER *wb, time_t now_s) { buffer_json_member_add_string(wb, "nm", rrdhost_hostname(localhost)); buffer_json_member_add_time_t(wb, "now", now_s); buffer_json_member_add_uint64(wb, "ai", 0); + + if(timings) + buffer_json_query_timings(wb, "timings", timings); + buffer_json_object_close(wb); buffer_json_array_close(wb); } +void buffer_json_cloud_timings(BUFFER *wb, const char *key, struct query_timings *timings) { + buffer_json_member_add_object(wb, key); + buffer_json_member_add_double(wb, "routing_ms", 0.0); + buffer_json_member_add_double(wb, "node_max_ms", 0.0); + buffer_json_member_add_double(wb, "total_ms", (NETDATA_DOUBLE)(timings->finished_ut - timings->received_ut) / USEC_PER_MS); + buffer_json_object_close(wb); +} + +void contexts_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *data __maybe_unused) { + struct rrdcontext_to_json_v2_entry *z = value; + string_freez(z->family); +} + int rrdcontext_to_json_v2(BUFFER *wb, struct api_v2_contexts_request *req, CONTEXTS_V2_OPTIONS options) { int resp = HTTP_RESP_OK; - req->timings.processing_ut = now_monotonic_usec(); - if(options & CONTEXTS_V2_SEARCH) options |= CONTEXTS_V2_CONTEXTS; @@ -418,15 +454,22 @@ int rrdcontext_to_json_v2(BUFFER *wb, struct api_v2_contexts_request *req, CONTE .contexts.pattern = string_to_simple_pattern(req->contexts), .contexts.scope_pattern = string_to_simple_pattern(req->scope_contexts), .q.pattern = string_to_simple_pattern_nocase(req->q), + .timings = { + .received_ut = now_monotonic_usec(), + } }; - if(options & CONTEXTS_V2_CONTEXTS) - ctl.ctx = dictionary_create_advanced(DICT_OPTION_SINGLE_THREADED | DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE, NULL, sizeof(struct rrdcontext_to_json_v2_entry)); + if(options & CONTEXTS_V2_CONTEXTS) { + ctl.ctx = dictionary_create_advanced( + DICT_OPTION_SINGLE_THREADED | DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE, NULL, + sizeof(struct rrdcontext_to_json_v2_entry)); + + dictionary_register_delete_callback(ctl.ctx, contexts_delete_callback, NULL); + } time_t now_s = now_realtime_sec(); buffer_json_initialize(wb, "\"", "\"", 0, true, false); buffer_json_member_add_uint64(wb, "api", 2); - buffer_json_agents_array_v2(wb, now_s); if(options & CONTEXTS_V2_DEBUG) { buffer_json_member_add_object(wb, "request"); @@ -473,7 +516,7 @@ int rrdcontext_to_json_v2(BUFFER *wb, struct api_v2_contexts_request *req, CONTE if(options & (CONTEXTS_V2_NODES | CONTEXTS_V2_NODES_DETAILED | CONTEXTS_V2_DEBUG)) buffer_json_array_close(wb); - req->timings.output_ut = now_monotonic_usec(); + ctl.timings.executed_ut = now_monotonic_usec(); version_hashes_api_v2(wb, &ctl.versions); if(options & CONTEXTS_V2_CONTEXTS) { @@ -506,13 +549,8 @@ int rrdcontext_to_json_v2(BUFFER *wb, struct api_v2_contexts_request *req, CONTE buffer_json_object_close(wb); } - req->timings.finished_ut = now_monotonic_usec(); - buffer_json_member_add_object(wb, "timings"); - buffer_json_member_add_double(wb, "prep_ms", (NETDATA_DOUBLE)(req->timings.processing_ut - req->timings.received_ut) / USEC_PER_MS); - buffer_json_member_add_double(wb, "query_ms", (NETDATA_DOUBLE)(req->timings.output_ut - req->timings.processing_ut) / USEC_PER_MS); - buffer_json_member_add_double(wb, "output_ms", (NETDATA_DOUBLE)(req->timings.finished_ut - req->timings.output_ut) / USEC_PER_MS); - buffer_json_member_add_double(wb, "total_ms", (NETDATA_DOUBLE)(req->timings.finished_ut - req->timings.received_ut) / USEC_PER_MS); - buffer_json_object_close(wb); + buffer_json_agents_array_v2(wb, &ctl.timings, now_s); + buffer_json_cloud_timings(wb, "timings", &ctl.timings); buffer_json_finalize(wb); cleanup: diff --git a/database/contexts/query_target.c b/database/contexts/query_target.c index 0378f52017..19ab8caadd 100644 --- a/database/contexts/query_target.c +++ b/database/contexts/query_target.c @@ -12,42 +12,55 @@ static void query_instance_release(QUERY_INSTANCE *qi); static void query_context_release(QUERY_CONTEXT *qc); static void query_node_release(QUERY_NODE *qn); -static __thread QUERY_TARGET thread_query_target = {}; - -// ---------------------------------------------------------------------------- -// query API - -typedef struct query_target_locals { - time_t start_s; - - QUERY_TARGET *qt; +static __thread QUERY_TARGET *thread_qt = NULL; +static struct { + struct { + SPINLOCK spinlock; + size_t count; + QUERY_TARGET *base; + } available; - RRDSET *st; + struct { + SPINLOCK spinlock; + size_t count; + QUERY_TARGET *base; + } used; +} query_target_base = { + .available = { + .spinlock = NETDATA_SPINLOCK_INITIALIZER, + .base = NULL, + .count = 0, + }, + .used = { + .spinlock = NETDATA_SPINLOCK_INITIALIZER, + .base = NULL, + .count = 0, + }, +}; + +static void query_target_destroy(QUERY_TARGET *qt) { + __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->query.size * sizeof(*qt->query.array), __ATOMIC_RELAXED); + freez(qt->query.array); - const char *scope_nodes; - const char *scope_contexts; + __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->dimensions.size * sizeof(*qt->dimensions.array), __ATOMIC_RELAXED); + freez(qt->dimensions.array); - const char *nodes; - const char *contexts; - const char *instances; - const char *dimensions; - const char *chart_label_key; - const char *labels; - const char *alerts; + __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->instances.size * sizeof(*qt->instances.array), __ATOMIC_RELAXED); + freez(qt->instances.array); - long long after; - long long before; - bool match_ids; - bool match_names; + __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->contexts.size * sizeof(*qt->contexts.array), __ATOMIC_RELAXED); + freez(qt->contexts.array); - size_t metrics_skipped_due_to_not_matching_timeframe; + __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->nodes.size * sizeof(*qt->nodes.array), __ATOMIC_RELAXED); + freez(qt->nodes.array); - char host_uuid_buffer[UUID_STR_LEN]; - QUERY_NODE *qn; // temp to pass on callbacks, ignore otherwise - no need to free -} QUERY_TARGET_LOCALS; + freez(qt); +} void query_target_release(QUERY_TARGET *qt) { - if(unlikely(!qt || !qt->used)) return; + if(unlikely(!qt)) return; + + internal_fatal(!qt->internal.used, "QUERY TARGET: qt to be released is not used"); simple_pattern_free(qt->nodes.scope_pattern); qt->nodes.scope_pattern = NULL; @@ -113,44 +126,91 @@ void query_target_release(QUERY_TARGET *qt) { qt->db.first_time_s = 0; qt->db.last_time_s = 0; - qt->group_by.used = 0; + for(size_t g = 0; g < MAX_QUERY_GROUP_BY_PASSES ;g++) + qt->group_by[g].used = 0; qt->id[0] = '\0'; - qt->used = false; + netdata_spinlock_lock(&query_target_base.used.spinlock); + DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(query_target_base.used.base, qt, internal.prev, internal.next); + query_target_base.used.count--; + netdata_spinlock_unlock(&query_target_base.used.spinlock); + + qt->internal.used = false; + thread_qt = NULL; + + if (qt->internal.queries > 1000) { + query_target_destroy(qt); + } + else { + netdata_spinlock_lock(&query_target_base.available.spinlock); + DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(query_target_base.available.base, qt, internal.prev, internal.next); + query_target_base.available.count++; + netdata_spinlock_unlock(&query_target_base.available.spinlock); + } } -void query_target_free(void) { - QUERY_TARGET *qt = &thread_query_target; - if(qt->used) - query_target_release(qt); +static QUERY_TARGET *query_target_get(void) { + netdata_spinlock_lock(&query_target_base.available.spinlock); + QUERY_TARGET *qt = query_target_base.available.base; + if (qt) { + DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(query_target_base.available.base, qt, internal.prev, internal.next); + query_target_base.available.count--; + } + netdata_spinlock_unlock(&query_target_base.available.spinlock); - __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->query.size * sizeof(QUERY_METRIC), __ATOMIC_RELAXED); - freez(qt->query.array); - qt->query.array = NULL; - qt->query.size = 0; + if(unlikely(!qt)) + qt = callocz(1, sizeof(*qt)); - __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->dimensions.size * sizeof(RRDMETRIC_ACQUIRED *), __ATOMIC_RELAXED); - freez(qt->dimensions.array); - qt->dimensions.array = NULL; - qt->dimensions.size = 0; + netdata_spinlock_lock(&query_target_base.used.spinlock); + DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(query_target_base.used.base, qt, internal.prev, internal.next); + query_target_base.used.count++; + netdata_spinlock_unlock(&query_target_base.used.spinlock); - __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->instances.size * sizeof(RRDINSTANCE_ACQUIRED *), __ATOMIC_RELAXED); - freez(qt->instances.array); - qt->instances.array = NULL; - qt->instances.size = 0; + qt->internal.used = true; + qt->internal.queries++; + thread_qt = qt; - __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->contexts.size * sizeof(RRDCONTEXT_ACQUIRED *), __ATOMIC_RELAXED); - freez(qt->contexts.array); - qt->contexts.array = NULL; - qt->contexts.size = 0; + return qt; +} - __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->nodes.size * sizeof(RRDHOST *), __ATOMIC_RELAXED); - freez(qt->nodes.array); - qt->nodes.array = NULL; - qt->nodes.size = 0; +// this is used to release a query target from a cancelled thread +void query_target_free(void) { + query_target_release(thread_qt); } +// ---------------------------------------------------------------------------- +// query API + +typedef struct query_target_locals { + time_t start_s; + + QUERY_TARGET *qt; + + RRDSET *st; + + const char *scope_nodes; + const char *scope_contexts; + + const char *nodes; + const char *contexts; + const char *instances; + const char *dimensions; + const char *chart_label_key; + const char *labels; + const char *alerts; + + long long after; + long long before; + bool match_ids; + bool match_names; + + size_t metrics_skipped_due_to_not_matching_timeframe; + + char host_uuid_buffer[UUID_STR_LEN]; + QUERY_NODE *qn; // temp to pass on callbacks, ignore otherwise - no need to free +} QUERY_TARGET_LOCALS; + struct storage_engine *query_metric_storage_engine(QUERY_TARGET *qt, QUERY_METRIC *qm, size_t tier) { QUERY_NODE *qn = query_node(qt, qm->link.query_node_id); return qn->rrdhost->db[tier].eng; @@ -198,8 +258,8 @@ static bool query_metric_add(QUERY_TARGET_LOCALS *qtl, QUERY_NODE *qn, QUERY_CON tier_retention[tier].db_metric_handle = eng->api.metric_get(qn->rrdhost->db[tier].instance, &rm->uuid); if(tier_retention[tier].db_metric_handle) { - tier_retention[tier].db_first_time_s = tier_retention[tier].eng->api.query_ops.oldest_time_s(tier_retention[tier].db_metric_handle); - tier_retention[tier].db_last_time_s = tier_retention[tier].eng->api.query_ops.latest_time_s(tier_retention[tier].db_metric_handle); + tier_retention[tier].db_first_time_s = storage_engine_oldest_time_s(tier_retention[tier].eng->backend, tier_retention[tier].db_metric_handle); + tier_retention[tier].db_last_time_s = storage_engine_latest_time_s(tier_retention[tier].eng->backend, tier_retention[tier].db_metric_handle); if(!common_first_time_s) common_first_time_s = tier_retention[tier].db_first_time_s; @@ -301,7 +361,7 @@ static inline void query_dimension_release(QUERY_DIMENSION *qd) { qd->rma = NULL; } -static QUERY_DIMENSION *query_dimension_allocate(QUERY_TARGET *qt, RRDMETRIC_ACQUIRED *rma, QUERY_STATUS status) { +static QUERY_DIMENSION *query_dimension_allocate(QUERY_TARGET *qt, RRDMETRIC_ACQUIRED *rma, QUERY_STATUS status, size_t priority) { if(qt->dimensions.used == qt->dimensions.size) { size_t old_mem = qt->dimensions.size * sizeof(*qt->dimensions.array); qt->dimensions.size = query_target_realloc_size(qt->dimensions.size, 4); @@ -316,12 +376,13 @@ static QUERY_DIMENSION *query_dimension_allocate(QUERY_TARGET *qt, RRDMETRIC_ACQ qd->slot = qt->dimensions.used++; qd->rma = rrdmetric_acquired_dup(rma); qd->status = status; + qd->priority = priority; return qd; } static bool query_dimension_add(QUERY_TARGET_LOCALS *qtl, QUERY_NODE *qn, QUERY_CONTEXT *qc, QUERY_INSTANCE *qi, - RRDMETRIC_ACQUIRED *rma, bool queryable_instance, size_t *metrics_added) { + RRDMETRIC_ACQUIRED *rma, bool queryable_instance, size_t *metrics_added, size_t priority) { QUERY_TARGET *qt = qtl->qt; RRDMETRIC *rm = rrdmetric_acquired_value(rma); @@ -364,7 +425,7 @@ static bool query_dimension_add(QUERY_TARGET_LOCALS *qtl, QUERY_NODE *qn, QUERY_ // the user selection does not match this dimension // but, we may still need to query it - if (qt->request.options & RRDR_OPTION_PERCENTAGE) { + if (query_target_needs_all_dimensions(qt)) { // this is percentage calculation // so, we need this dimension to calculate the percentage needed = true; @@ -389,7 +450,7 @@ static bool query_dimension_add(QUERY_TARGET_LOCALS *qtl, QUERY_NODE *qn, QUERY_ status |= QUERY_STATUS_DIMENSION_HIDDEN; options |= RRDR_DIMENSION_HIDDEN; - if (qt->request.options & RRDR_OPTION_PERCENTAGE) { + if (query_target_needs_all_dimensions(qt)) { // this is percentage calculation // so, we need this dimension to calculate the percentage needed = true; @@ -430,7 +491,7 @@ static bool query_dimension_add(QUERY_TARGET_LOCALS *qtl, QUERY_NODE *qn, QUERY_ if(undo) return false; - query_dimension_allocate(qt, rma, status); + query_dimension_allocate(qt, rma, status, priority); return true; } @@ -694,17 +755,17 @@ static bool query_instance_add(QUERY_TARGET_LOCALS *qtl, QUERY_NODE *qn, QUERY_C if(queryable_instance && qt->request.version >= 2) query_target_eval_instance_rrdcalc(qtl, qn, qc, qi); - size_t dimensions_added = 0, metrics_added = 0; + size_t dimensions_added = 0, metrics_added = 0, priority = 0; if(unlikely(qt->request.rma)) { - if(query_dimension_add(qtl, qn, qc, qi, qt->request.rma, queryable_instance, &metrics_added)) + if(query_dimension_add(qtl, qn, qc, qi, qt->request.rma, queryable_instance, &metrics_added, priority++)) dimensions_added++; } else { RRDMETRIC *rm; dfe_start_read(ri->rrdmetrics, rm) { if(query_dimension_add(qtl, qn, qc, qi, (RRDMETRIC_ACQUIRED *) rm_dfe.item, - queryable_instance, &metrics_added)) + queryable_instance, &metrics_added, priority++)) dimensions_added++; } dfe_done(rm); @@ -957,13 +1018,7 @@ QUERY_TARGET *query_target_create(QUERY_TARGET_REQUEST *qtr) { if(!service_running(ABILITY_DATA_QUERIES)) return NULL; - QUERY_TARGET *qt = &thread_query_target; - - if(qt->used) - fatal("QUERY TARGET: this query target is already used (%zu queries made with this QUERY_TARGET so far).", qt->queries); - - qt->used = true; - qt->queries++; + QUERY_TARGET *qt = query_target_get(); if(!qtr->received_ut) qtr->received_ut = now_monotonic_usec(); @@ -985,7 +1040,11 @@ QUERY_TARGET *query_target_create(QUERY_TARGET_REQUEST *qtr) { query_target_generate_name(qt); qt->window.after = qt->request.after; qt->window.before = qt->request.before; + qt->window.options = qt->request.options; + if(query_target_has_percentage_of_instance(qt)) + qt->window.options &= ~RRDR_OPTION_PERCENTAGE; + rrdr_relative_window_to_absolute(&qt->window.after, &qt->window.before, &qt->window.now); // prepare our local variables - we need these across all these functions diff --git a/database/contexts/rrdcontext.h b/database/contexts/rrdcontext.h index f42c43135b..5093e1d614 100644 --- a/database/contexts/rrdcontext.h +++ b/database/contexts/rrdcontext.h @@ -201,6 +201,7 @@ typedef struct query_instance { typedef struct query_dimension { uint32_t slot; + uint32_t priority; RRDMETRIC_ACQUIRED *rma; QUERY_STATUS status; } QUERY_DIMENSION; @@ -231,7 +232,8 @@ typedef struct query_metric { STORAGE_POINT query_points; struct { - size_t slot; + uint32_t slot; + uint32_t first_slot; STRING *id; STRING *name; STRING *units; @@ -241,9 +243,16 @@ typedef struct query_metric { } QUERY_METRIC; #define MAX_QUERY_TARGET_ID_LENGTH 255 +#define MAX_QUERY_GROUP_BY_PASSES 2 typedef bool (*qt_interrupt_callback_t)(void *data); +struct group_by_pass { + RRDR_GROUP_BY group_by; + char *group_by_label; + RRDR_GROUP_BY_FUNCTION aggregation; +}; + typedef struct query_target_request { size_t version; @@ -284,9 +293,7 @@ typedef struct query_target_request { const char *time_group_options; // group by across multiple time-series - RRDR_GROUP_BY group_by; - char *group_by_label; - RRDR_GROUP_BY_FUNCTION group_by_aggregate_function; + struct group_by_pass group_by[MAX_QUERY_GROUP_BY_PASSES]; usec_t received_ut; @@ -313,15 +320,19 @@ struct query_versions { uint64_t alerts_soft_hash; }; +struct query_timings { + usec_t received_ut; + usec_t preprocessed_ut; + usec_t executed_ut; + usec_t finished_ut; +}; + #define query_view_update_every(qt) ((qt)->window.group * (qt)->window.query_granularity) typedef struct query_target { char id[MAX_QUERY_TARGET_ID_LENGTH + 1]; // query identifier (for logging) QUERY_TARGET_REQUEST request; - bool used; // when true, this query is currently being used - size_t queries; // how many query we have done so far with this QUERY_TARGET - not related to database queries - struct { time_t now; // the current timestamp, the absolute max for any query timestamp bool relative; // true when the request made with relative timestamps, true if it was absolute @@ -388,19 +399,20 @@ typedef struct query_target { struct { size_t used; - char *label_keys[GROUP_BY_MAX_LABEL_KEYS]; - } group_by; + char *label_keys[GROUP_BY_MAX_LABEL_KEYS * MAX_QUERY_GROUP_BY_PASSES]; + } group_by[MAX_QUERY_GROUP_BY_PASSES]; STORAGE_POINT query_points; - struct query_versions versions; + struct query_timings timings; struct { - usec_t received_ut; - usec_t preprocessed_ut; - usec_t executed_ut; - usec_t finished_ut; - } timings; + SPINLOCK spinlock; + bool used; // when true, this query is currently being used + size_t queries; // how many query we have done so far with this QUERY_TARGET - not related to database queries + struct query_target *prev; + struct query_target *next; + } internal; } QUERY_TARGET; static inline NEVERNULL QUERY_NODE *query_node(QUERY_TARGET *qt, size_t id) { @@ -455,13 +467,6 @@ struct api_v2_contexts_request { char *contexts; char *q; - struct { - usec_t received_ut; - usec_t processing_ut; - usec_t output_ut; - usec_t finished_ut; - } timings; - time_t timeout_ms; qt_interrupt_callback_t interrupt_callback; @@ -479,8 +484,10 @@ typedef enum __attribute__ ((__packed__)) { int rrdcontext_to_json_v2(BUFFER *wb, struct api_v2_contexts_request *req, CONTEXTS_V2_OPTIONS options); RRDCONTEXT_TO_JSON_OPTIONS rrdcontext_to_json_parse_options(char *o); -void buffer_json_agents_array_v2(BUFFER *wb, time_t now_s); +void buffer_json_agents_array_v2(BUFFER *wb, struct query_timings *timings, time_t now_s); void buffer_json_node_add_v2(BUFFER *wb, RRDHOST *host, size_t ni, usec_t duration_ut); +void buffer_json_query_timings(BUFFER *wb, const char *key, struct query_timings *timings); +void buffer_json_cloud_timings(BUFFER *wb, const char *key, struct query_timings *timings); // ---------------------------------------------------------------------------- // scope @@ -515,5 +522,29 @@ bool rrdcontext_retention_match(RRDCONTEXT_ACQUIRED *rca, time_t after, time_t b (((first_entry_s) - ((update_every_s) * 2) <= (before)) && \ ((last_entry_s) + ((update_every_s) * 2) >= (after))) +#define query_target_aggregatable(qt) ((qt)->window.options & RRDR_OPTION_RETURN_RAW) + +static inline bool query_target_has_percentage_of_instance(QUERY_TARGET *qt) { + for(size_t g = 0; g < MAX_QUERY_GROUP_BY_PASSES ;g++) + if(qt->request.group_by[g].group_by & RRDR_GROUP_BY_PERCENTAGE_OF_INSTANCE) + return true; + + return false; +} + +static inline bool query_target_needs_all_dimensions(QUERY_TARGET *qt) { + if(qt->request.options & RRDR_OPTION_PERCENTAGE) + return true; + + return query_target_has_percentage_of_instance(qt); +} + +static inline bool query_target_has_percentage_units(QUERY_TARGET *qt) { + if(qt->window.time_group_method == RRDR_GROUPING_CV || query_target_needs_all_dimensions(qt)) + return true; + + return false; +} + #endif // NETDATA_RRDCONTEXT_H diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c index 9606632ab5..02d08a1647 100644 --- a/database/engine/pagecache.c +++ b/database/engine/pagecache.c @@ -769,7 +769,10 @@ inline void rrdeng_prep_wait(PDC *pdc) { } } -void rrdeng_prep_query(PDC *pdc) { +void rrdeng_prep_query(struct page_details_control *pdc, bool worker) { + if(worker) + worker_is_busy(UV_EVENT_DBENGINE_QUERY); + size_t pages_to_load = 0; pdc->page_list_JudyL = get_page_list(pdc->ctx, pdc->metric, pdc->start_time_s * USEC_PER_SEC, @@ -792,6 +795,9 @@ void rrdeng_prep_query(PDC *pdc) { completion_mark_complete(&pdc->prep_completion); pdc_release_and_destroy_if_unreferenced(pdc, true, true); + + if(worker) + worker_is_idle(); } /** @@ -824,7 +830,7 @@ void pg_cache_preload(struct rrdeng_query_handle *handle) { handle->pdc->refcount++; // we get 1 for the query thread and 1 for the prep thread if(unlikely(handle->pdc->priority == STORAGE_PRIORITY_SYNCHRONOUS)) - rrdeng_prep_query(handle->pdc); + rrdeng_prep_query(handle->pdc, false); else rrdeng_enq_cmd(handle->ctx, RRDENG_OPCODE_QUERY, handle->pdc, NULL, handle->priority, NULL, NULL); } diff --git a/database/engine/pagecache.h b/database/engine/pagecache.h index ac7202f440..5242db89e5 100644 --- a/database/engine/pagecache.h +++ b/database/engine/pagecache.h @@ -52,7 +52,7 @@ struct rrdeng_query_handle; struct page_details_control; void rrdeng_prep_wait(struct page_details_control *pdc); -void rrdeng_prep_query(struct page_details_control *pdc); +void rrdeng_prep_query(struct page_details_control *pdc, bool worker); void pg_cache_preload(struct rrdeng_query_handle *handle); struct pgc_page *pg_cache_lookup_next(struct rrdengine_instance *ctx, struct page_details_control *pdc, time_t now_s, time_t last_update_every_s, size_t *entries); void pgc_and_mrg_initialize(void); diff --git a/database/engine/pdc.c b/database/engine/pdc.c index a29122cf54..42fb2f6de5 100644 --- a/database/engine/pdc.c +++ b/database/engine/pdc.c @@ -1151,6 +1151,9 @@ static inline void datafile_extent_read_free(void *buffer) { } void epdl_find_extent_and_populate_pages(struct rrdengine_instance *ctx, EPDL *epdl, bool worker) { + if(worker) + worker_is_busy(UV_EVENT_DBENGINE_EXTENT_CACHE_LOOKUP); + size_t *statistics_counter = NULL; PDC_PAGE_STATUS not_loaded_pages_tag = 0, loaded_pages_tag = 0; @@ -1173,9 +1176,6 @@ void epdl_find_extent_and_populate_pages(struct rrdengine_instance *ctx, EPDL *e goto cleanup; } - if(worker) - worker_is_busy(UV_EVENT_DBENGINE_EXTENT_CACHE_LOOKUP); - bool extent_found_in_cache = false; void *extent_compressed_data = NULL; diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c index fae6656d93..7811a5eaaf 100644 --- a/database/engine/rrdengine.c +++ b/database/engine/rrdengine.c @@ -16,6 +16,24 @@ unsigned rrdeng_pages_per_extent = MAX_PAGES_PER_EXTENT; #error Please increase WORKER_UTILIZATION_MAX_JOB_TYPES to at least (RRDENG_MAX_OPCODE + 2) #endif +struct rrdeng_cmd { + struct rrdengine_instance *ctx; + enum rrdeng_opcode opcode; + void *data; + struct completion *completion; + enum storage_priority priority; + dequeue_callback_t dequeue_cb; + + struct { + struct rrdeng_cmd *prev; + struct rrdeng_cmd *next; + } queue; +}; + +static inline struct rrdeng_cmd rrdeng_deq_cmd(bool from_worker); +static inline void worker_dispatch_extent_read(struct rrdeng_cmd cmd, bool from_worker); +static inline void worker_dispatch_query_prep(struct rrdeng_cmd cmd, bool from_worker); + struct rrdeng_main { uv_thread_t thread; uv_loop_t loop; @@ -45,7 +63,6 @@ struct rrdeng_main { struct { size_t dispatched; size_t executing; - size_t pending_cb; } atomics; } work_cmd; @@ -132,8 +149,22 @@ static void work_request_init(void) { ); } -static inline bool work_request_full(void) { - return __atomic_load_n(&rrdeng_main.work_cmd.atomics.dispatched, __ATOMIC_RELAXED) >= (size_t)(libuv_worker_threads - RESERVED_LIBUV_WORKER_THREADS); +enum LIBUV_WORKERS_STATUS { + LIBUV_WORKERS_RELAXED, + LIBUV_WORKERS_STRESSED, + LIBUV_WORKERS_CRITICAL, +}; + +static inline enum LIBUV_WORKERS_STATUS work_request_full(void) { + size_t dispatched = __atomic_load_n(&rrdeng_main.work_cmd.atomics.dispatched, __ATOMIC_RELAXED); + + if(dispatched >= (size_t)(libuv_worker_threads)) + return LIBUV_WORKERS_CRITICAL; + + else if(dispatched >= (size_t)(libuv_worker_threads - RESERVED_LIBUV_WORKER_THREADS)) + return LIBUV_WORKERS_STRESSED; + + return LIBUV_WORKERS_RELAXED; } static inline void work_done(struct rrdeng_work *work_request) { @@ -147,12 +178,38 @@ static void work_standard_worker(uv_work_t *req) { worker_is_busy(UV_EVENT_WORKER_INIT); struct rrdeng_work *work_request = req->data; + work_request->data = work_request->work_cb(work_request->ctx, work_request->data, work_request->completion, req); worker_is_idle(); + if(work_request->opcode == RRDENG_OPCODE_EXTENT_READ || work_request->opcode == RRDENG_OPCODE_QUERY) { + internal_fatal(work_request->after_work_cb != NULL, "DBENGINE: opcodes with a callback should not boosted"); + + while(1) { + struct rrdeng_cmd cmd = rrdeng_deq_cmd(true); + if (cmd.opcode == RRDENG_OPCODE_NOOP) + break; + + worker_is_busy(UV_EVENT_WORKER_INIT); + switch (cmd.opcode) { + case RRDENG_OPCODE_EXTENT_READ: + worker_dispatch_extent_read(cmd, true); + break; + + case RRDENG_OPCODE_QUERY: + worker_dispatch_query_prep(cmd, true); + break; + + default: + fatal("DBENGINE: Opcode should not be executed synchronously"); + break; + } + worker_is_idle(); + } + } + __atomic_sub_fetch(&rrdeng_main.work_cmd.atomics.dispatched, 1, __ATOMIC_RELAXED); __atomic_sub_fetch(&rrdeng_main.work_cmd.atomics.executing, 1, __ATOMIC_RELAXED); - __atomic_add_fetch(&rrdeng_main.work_cmd.atomics.pending_cb, 1, __ATOMIC_RELAXED); // signal the event loop a worker is available fatal_assert(0 == uv_async_send(&rrdeng_main.async)); @@ -167,7 +224,6 @@ static void after_work_standard_callback(uv_work_t* req, int status) { work_request->after_work_cb(work_request->ctx, work_request->data, work_request->completion, req, status); work_done(work_request); - __atomic_sub_fetch(&rrdeng_main.work_cmd.atomics.pending_cb, 1, __ATOMIC_RELAXED); worker_is_idle(); } @@ -369,20 +425,6 @@ void wal_release(WAL *wal) { // ---------------------------------------------------------------------------- // command queue cache -struct rrdeng_cmd { - struct rrdengine_instance *ctx; - enum rrdeng_opcode opcode; - void *data; - struct completion *completion; - enum storage_priority priority; - dequeue_callback_t dequeue_cb; - - struct { - struct rrdeng_cmd *prev; - struct rrdeng_cmd *next; - } queue; -}; - static void rrdeng_cmd_queue_init(void) { rrdeng_main.cmd_queue.ar = aral_create("dbengine-opcodes", sizeof(struct rrdeng_cmd), @@ -465,14 +507,33 @@ static inline bool rrdeng_cmd_has_waiting_opcodes_in_lower_priorities(STORAGE_PR return false; } -static inline struct rrdeng_cmd rrdeng_deq_cmd(void) { +#define opcode_empty (struct rrdeng_cmd) { \ + .ctx = NULL, \ + .opcode = RRDENG_OPCODE_NOOP, \ + .priority = STORAGE_PRIORITY_BEST_EFFORT, \ + .completion = NULL, \ + .data = NULL, \ +} + +static inline struct rrdeng_cmd rrdeng_deq_cmd(bool from_worker) { struct rrdeng_cmd *cmd = NULL; + enum LIBUV_WORKERS_STATUS status = work_request_full(); - STORAGE_PRIORITY max_priority = work_request_full() ? STORAGE_PRIORITY_INTERNAL_DBENGINE : STORAGE_PRIORITY_INTERNAL_MAX_DONT_USE - 1; + STORAGE_PRIORITY min_priority, max_priority; + min_priority = STORAGE_PRIORITY_INTERNAL_DBENGINE; + max_priority = (status != LIBUV_WORKERS_RELAXED) ? STORAGE_PRIORITY_INTERNAL_DBENGINE : STORAGE_PRIORITY_INTERNAL_MAX_DONT_USE - 1; + + if(from_worker) { + if(status == LIBUV_WORKERS_CRITICAL) + return opcode_empty; + + min_priority = STORAGE_PRIORITY_INTERNAL_QUERY_PREP; + max_priority = STORAGE_PRIORITY_BEST_EFFORT; + } // find an opcode to execute from the queue netdata_spinlock_lock(&rrdeng_main.cmd_queue.unsafe.spinlock); - for(STORAGE_PRIORITY priority = STORAGE_PRIORITY_INTERNAL_DBENGINE; priority <= max_priority ; priority++) { + for(STORAGE_PRIORITY priority = min_priority; priority <= max_priority ; priority++) { cmd = rrdeng_main.cmd_queue.unsafe.waiting_items_by_priority[priority]; if(cmd) { @@ -508,13 +569,7 @@ static inline struct rrdeng_cmd rrdeng_deq_cmd(void) { aral_freez(rrdeng_main.cmd_queue.ar, cmd); } else - ret = (struct rrdeng_cmd) { - .ctx = NULL, - .opcode = RRDENG_OPCODE_NOOP, - .priority = STORAGE_PRIORITY_BEST_EFFORT, - .completion = NULL, - .data = NULL, - }; + ret = opcode_empty; return ret; } @@ -1353,14 +1408,9 @@ static void *cache_evict_tp_worker(struct rrdengine_instance *ctx __maybe_unused return data; } -static void after_prep_query(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) { - ; -} - static void *query_prep_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *req __maybe_unused) { - worker_is_busy(UV_EVENT_DBENGINE_QUERY); PDC *pdc = data; - rrdeng_prep_query(pdc); + rrdeng_prep_query(pdc, true); return data; } @@ -1484,10 +1534,6 @@ static void after_do_cache_evict(struct rrdengine_instance *ctx __maybe_unused, rrdeng_main.evictions_running--; } -static void after_extent_read(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) { - ; -} - static void after_journal_v2_indexing(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) { __atomic_store_n(&ctx->atomic.migration_to_v2_running, false, __ATOMIC_RELAXED); rrdeng_enq_cmd(ctx, RRDENG_OPCODE_DATABASE_ROTATE, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL); @@ -1616,6 +1662,26 @@ bool rrdeng_dbengine_spawn(struct rrdengine_instance *ctx __maybe_unused) { return true; } +static inline void worker_dispatch_extent_read(struct rrdeng_cmd cmd, bool from_worker) { + struct rrdengine_instance *ctx = cmd.ctx; + EPDL *epdl = cmd.data; + + if(from_worker) + epdl_find_extent_and_populate_pages(ctx, epdl, true); + else + work_dispatch(ctx, epdl, NULL, cmd.opcode, extent_read_tp_worker, NULL); +} + +static inline void worker_dispatch_query_prep(struct rrdeng_cmd cmd, bool from_worker) { + struct rrdengine_instance *ctx = cmd.ctx; + PDC *pdc = cmd.data; + + if(from_worker) + rrdeng_prep_query(pdc, true); + else + work_dispatch(ctx, pdc, NULL, cmd.opcode, query_prep_tp_worker, NULL); +} + void dbengine_event_loop(void* arg) { sanity_check(); uv_thread_set_name_np(pthread_self(), "DBENGINE"); @@ -1673,25 +1739,19 @@ void dbengine_event_loop(void* arg) { /* wait for commands */ do { worker_is_busy(RRDENG_OPCODE_MAX); - cmd = rrdeng_deq_cmd(); + cmd = rrdeng_deq_cmd(RRDENG_OPCODE_NOOP); opcode = cmd.opcode; worker_is_busy(opcode); switch (opcode) { - case RRDENG_OPCODE_EXTENT_READ: { - struct rrdengine_instance *ctx = cmd.ctx; - EPDL *epdl = cmd.data; - work_dispatch(ctx, epdl, NULL, opcode, extent_read_tp_worker, after_extent_read); + case RRDENG_OPCODE_EXTENT_READ: + worker_dispatch_extent_read(cmd, false); break; - } - case RRDENG_OPCODE_QUERY: { - struct rrdengine_instance *ctx = cmd.ctx; - PDC *pdc = cmd.data; - work_dispatch(ctx, pdc, NULL, opcode, query_prep_tp_worker, after_prep_query); + case RRDENG_OPCODE_QUERY: + worker_dispatch_query_prep(cmd, false); break; - } case RRDENG_OPCODE_EXTENT_WRITE: { struct rrdengine_instance *ctx = cmd.ctx; diff --git a/database/engine/rrdengine.h b/database/engine/rrdengine.h index 9e52764d54..69e4123544 100644 --- a/database/engine/rrdengine.h +++ b/database/engine/rrdengine.h @@ -181,14 +181,17 @@ typedef enum __attribute__ ((__packed__)) { } RRDENG_COLLECT_PAGE_FLAGS; struct rrdeng_collect_handle { + struct storage_collect_handle common; // has to be first item + + RRDENG_COLLECT_PAGE_FLAGS page_flags; + RRDENG_COLLECT_HANDLE_OPTIONS options; + uint8_t type; + struct metric *metric; struct pgc_page *page; void *data; size_t data_size; struct pg_alignment *alignment; - RRDENG_COLLECT_HANDLE_OPTIONS options; - uint8_t type; - RRDENG_COLLECT_PAGE_FLAGS page_flags; uint32_t page_entries_max; uint32_t page_position; // keep track of the current page size, to make sure we don't exceed it usec_t page_start_time_ut; diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c index b06f536c4b..ddc306ed74 100755 --- a/database/engine/rrdengineapi.c +++ b/database/engine/rrdengineapi.c @@ -255,6 +255,7 @@ STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metri struct rrdeng_collect_handle *handle; handle = callocz(1, sizeof(struct rrdeng_collect_handle)); + handle->common.backend = STORAGE_ENGINE_BACKEND_DBENGINE; handle->metric = metric; handle->page = NULL; handle->data = NULL; @@ -774,6 +775,7 @@ void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, rrddim_handle->start_time_s = handle->start_time_s; rrddim_handle->end_time_s = handle->end_time_s; rrddim_handle->priority = priority; + rrddim_handle->backend = STORAGE_ENGINE_BACKEND_DBENGINE; pg_cache_preload(handle); @@ -789,6 +791,7 @@ void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, rrddim_handle->start_time_s = handle->start_time_s; rrddim_handle->end_time_s = 0; rrddim_handle->priority = priority; + rrddim_handle->backend = STORAGE_ENGINE_BACKEND_DBENGINE; } } diff --git a/database/ram/rrddim_mem.c b/database/ram/rrddim_mem.c index 0f17d6cb9e..a417c5ae3e 100644 --- a/database/ram/rrddim_mem.c +++ b/database/ram/rrddim_mem.c @@ -143,6 +143,7 @@ STORAGE_COLLECT_HANDLE *rrddim_collect_init(STORAGE_METRIC_HANDLE *db_metric_han internal_fatal((uint32_t)mh->update_every_s != update_every, "RRDDIM: update requested does not match the dimension"); struct mem_collect_handle *ch = callocz(1, sizeof(struct mem_collect_handle)); + ch->common.backend = STORAGE_ENGINE_BACKEND_RRDDIM; ch->rd = rd; ch->db_metric_handle = db_metric_handle; @@ -204,7 +205,7 @@ static inline void rrddim_fill_the_gap(STORAGE_COLLECT_HANDLE *collection_handle void rrddim_collect_store_metric(STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time_ut, - NETDATA_DOUBLE number, + NETDATA_DOUBLE n, NETDATA_DOUBLE min_value __maybe_unused, NETDATA_DOUBLE max_value __maybe_unused, uint16_t count __maybe_unused, @@ -226,7 +227,7 @@ void rrddim_collect_store_metric(STORAGE_COLLECT_HANDLE *collection_handle, if(unlikely(mh->last_updated_s && point_in_time_s - mh->update_every_s > mh->last_updated_s)) rrddim_fill_the_gap(collection_handle, point_in_time_s); - rd->db[mh->current_entry] = pack_storage_number(number, flags); + rd->db[mh->current_entry] = pack_storage_number(n, flags); mh->counter++; mh->current_entry = (mh->current_entry + 1) >= mh->entries ? 0 : mh->current_entry + 1; mh->last_updated_s = point_in_time_s; @@ -340,6 +341,7 @@ void rrddim_query_init(STORAGE_METRIC_HANDLE *db_metric_handle, struct storage_e handle->start_time_s = start_time_s; handle->end_time_s = end_time_s; handle->priority = priority; + handle->backend = STORAGE_ENGINE_BACKEND_RRDDIM; struct mem_query_handle* h = mallocz(sizeof(struct mem_query_handle)); h->db_metric_handle = db_metric_handle; diff --git a/database/ram/rrddim_mem.h b/database/ram/rrddim_mem.h index 373a2bd7b9..a75814a0be 100644 --- a/database/ram/rrddim_mem.h +++ b/database/ram/rrddim_mem.h @@ -6,6 +6,8 @@ #include "database/rrd.h" struct mem_collect_handle { + struct storage_collect_handle common; // has to be first item + STORAGE_METRIC_HANDLE *db_metric_handle; RRDDIM *rd; }; @@ -32,7 +34,7 @@ void rrddim_metrics_group_release(STORAGE_INSTANCE *db_instance, STORAGE_METRICS STORAGE_COLLECT_HANDLE *rrddim_collect_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every, STORAGE_METRICS_GROUP *smg); void rrddim_store_metric_change_collection_frequency(STORAGE_COLLECT_HANDLE *collection_handle, int update_every); -void rrddim_collect_store_metric(STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time_ut, NETDATA_DOUBLE number, +void rrddim_collect_store_metric(STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time_ut, NETDATA_DOUBLE n, NETDATA_DOUBLE min_value, NETDATA_DOUBLE max_value, uint16_t count, diff --git a/database/rrd.h b/database/rrd.h index 8bca004b79..d1fbca66b0 100644 --- a/database/rrd.h +++ b/database/rrd.h @@ -109,12 +109,20 @@ RRD_MEMORY_MODE rrd_memory_mode_id(const char *name); typedef struct storage_query_handle STORAGE_QUERY_HANDLE; +typedef enum __attribute__ ((__packed__)) { + STORAGE_ENGINE_BACKEND_RRDDIM = 1, + STORAGE_ENGINE_BACKEND_DBENGINE = 2, +} STORAGE_ENGINE_BACKEND; + +#define is_valid_backend(backend) ((backend) >= STORAGE_ENGINE_BACKEND_RRDDIM && (backend) <= STORAGE_ENGINE_BACKEND_DBENGINE) + // iterator state for RRD dimension data queries struct storage_engine_query_handle { time_t start_time_s; time_t end_time_s; STORAGE_PRIORITY priority; - STORAGE_QUERY_HANDLE* handle; + STORAGE_ENGINE_BACKEND backend; + STORAGE_QUERY_HANDLE *handle; }; // ---------------------------------------------------------------------------- @@ -162,11 +170,11 @@ extern time_t rrdset_free_obsolete_time_s; #if defined(ENV32BIT) #define MIN_LIBUV_WORKER_THREADS 8 -#define MAX_LIBUV_WORKER_THREADS 64 +#define MAX_LIBUV_WORKER_THREADS 128 #define RESERVED_LIBUV_WORKER_THREADS 3 #else #define MIN_LIBUV_WORKER_THREADS 16 -#define MAX_LIBUV_WORKER_THREADS 128 +#define MAX_LIBUV_WORKER_THREADS 1024 #define RESERVED_LIBUV_WORKER_THREADS 6 #endif @@ -301,19 +309,20 @@ bool exporting_labels_filter_callback(const char *name, const char *value, RRDLA // ---------------------------------------------------------------------------- // engine-specific iterator state for dimension data collection -typedef struct storage_collect_handle STORAGE_COLLECT_HANDLE; +typedef struct storage_collect_handle { + STORAGE_ENGINE_BACKEND backend; +} STORAGE_COLLECT_HANDLE; // ---------------------------------------------------------------------------- // Storage tier data for every dimension struct rrddim_tier { STORAGE_POINT virtual_point; + STORAGE_ENGINE_BACKEND backend; size_t tier_grouping; time_t next_point_end_time_s; STORAGE_METRIC_HANDLE *db_metric_handle; // the metric handle inside the database STORAGE_COLLECT_HANDLE *db_collection_handle; // the data collection handle - struct storage_engine_collect_ops *collect_ops; - struct storage_engine_query_ops *query_ops; }; void rrdr_fill_tier_gap_from_smaller_tiers(RRDDIM *rd, size_t tier, time_t now_s); @@ -412,56 +421,214 @@ size_t rrddim_memory_file_header_size(void); void rrddim_memory_file_save(RRDDIM *rd); // ------------------------------------------------------------------------ -// function pointers that handle data collection -struct storage_engine_collect_ops { - // an initialization function to run before starting collection - STORAGE_COLLECT_HANDLE *(*init)(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every, STORAGE_METRICS_GROUP *smg); +// DATA COLLECTION STORAGE OPS - // run this to store each metric into the database - void (*store_metric)(STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time, NETDATA_DOUBLE number, NETDATA_DOUBLE min_value, - NETDATA_DOUBLE max_value, uint16_t count, uint16_t anomaly_count, SN_FLAGS flags); +STORAGE_METRICS_GROUP *rrdeng_metrics_group_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid); +STORAGE_METRICS_GROUP *rrddim_metrics_group_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid); +static inline STORAGE_METRICS_GROUP *storage_engine_metrics_group_get(STORAGE_ENGINE_BACKEND backend, STORAGE_INSTANCE *db_instance, uuid_t *uuid) { + internal_fatal(!is_valid_backend(backend), "STORAGE: invalid backend"); - // run this to flush / reset the current data collection sequence - void (*flush)(STORAGE_COLLECT_HANDLE *collection_handle); +#ifdef ENABLE_DBENGINE + if(likely(backend == STORAGE_ENGINE_BACKEND_DBENGINE)) + return rrdeng_metrics_group_get(db_instance, uuid); +#endif + return rrddim_metrics_group_get(db_instance, uuid); +} - // a finalization function to run after collection is over - // returns 1 if it's safe to delete the dimension - int (*finalize)(STORAGE_COLLECT_HANDLE *collection_handle); +void rrdeng_metrics_group_release(STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg); +void rrddim_metrics_group_release(STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg); +static inline void storage_engine_metrics_group_release(STORAGE_ENGINE_BACKEND backend, STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg) { + internal_fatal(!is_valid_backend(backend), "STORAGE: invalid backend"); - void (*change_collection_frequency)(STORAGE_COLLECT_HANDLE *collection_handle, int update_every); +#ifdef ENABLE_DBENGINE + if(likely(backend == STORAGE_ENGINE_BACKEND_DBENGINE)) + rrdeng_metrics_group_release(db_instance, smg); + else +#endif + rrddim_metrics_group_release(db_instance, smg); +} + +STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every, STORAGE_METRICS_GROUP *smg); +STORAGE_COLLECT_HANDLE *rrddim_collect_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every, STORAGE_METRICS_GROUP *smg); +static inline STORAGE_COLLECT_HANDLE *storage_metric_store_init(STORAGE_ENGINE_BACKEND backend, STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every, STORAGE_METRICS_GROUP *smg) { + internal_fatal(!is_valid_backend(backend), "STORAGE: invalid backend"); + +#ifdef ENABLE_DBENGINE + if(likely(backend == STORAGE_ENGINE_BACKEND_DBENGINE)) + return rrdeng_store_metric_init(db_metric_handle, update_every, smg); +#endif + return rrddim_collect_init(db_metric_handle, update_every, smg); +} + +void rrdeng_store_metric_next( + STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time_ut, + NETDATA_DOUBLE n, NETDATA_DOUBLE min_value, NETDATA_DOUBLE max_value, + uint16_t count, uint16_t anomaly_count, SN_FLAGS flags); + +void rrddim_collect_store_metric( + STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time_ut, + NETDATA_DOUBLE n, NETDATA_DOUBLE min_value, NETDATA_DOUBLE max_value, + uint16_t count, uint16_t anomaly_count, SN_FLAGS flags); + +static inline void storage_engine_store_metric( + STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time_ut, + NETDATA_DOUBLE n, NETDATA_DOUBLE min_value, NETDATA_DOUBLE max_value, + uint16_t count, uint16_t anomaly_count, SN_FLAGS flags) { + internal_fatal(!is_valid_backend(collection_handle->backend), "STORAGE: invalid backend"); + +#ifdef ENABLE_DBENGINE + if(likely(collection_handle->backend == STORAGE_ENGINE_BACKEND_DBENGINE)) + return rrdeng_store_metric_next(collection_handle, point_in_time_ut, + n, min_value, max_value, + count, anomaly_count, flags); +#endif + return rrddim_collect_store_metric(collection_handle, point_in_time_ut, + n, min_value, max_value, + count, anomaly_count, flags); +} + +void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_handle); +void rrddim_store_metric_flush(STORAGE_COLLECT_HANDLE *collection_handle); +static inline void storage_engine_store_flush(STORAGE_COLLECT_HANDLE *collection_handle) { + if(unlikely(!collection_handle)) + return; + + internal_fatal(!is_valid_backend(collection_handle->backend), "STORAGE: invalid backend"); + +#ifdef ENABLE_DBENGINE + if(likely(collection_handle->backend == STORAGE_ENGINE_BACKEND_DBENGINE)) + rrdeng_store_metric_flush_current_page(collection_handle); + else +#endif + rrddim_store_metric_flush(collection_handle); +} + +int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle); +int rrddim_collect_finalize(STORAGE_COLLECT_HANDLE *collection_handle); +// a finalization function to run after collection is over +// returns 1 if it's safe to delete the dimension +static inline int storage_engine_store_finalize(STORAGE_COLLECT_HANDLE *collection_handle) { + internal_fatal(!is_valid_backend(collection_handle->backend), "STORAGE: invalid backend"); + +#ifdef ENABLE_DBENGINE + if(likely(collection_handle->backend == STORAGE_ENGINE_BACKEND_DBENGINE)) + return rrdeng_store_metric_finalize(collection_handle); +#endif + + return rrddim_collect_finalize(collection_handle); +} + +void rrdeng_store_metric_change_collection_frequency(STORAGE_COLLECT_HANDLE *collection_handle, int update_every); +void rrddim_store_metric_change_collection_frequency(STORAGE_COLLECT_HANDLE *collection_handle, int update_every); +static inline void storage_engine_store_change_collection_frequency(STORAGE_COLLECT_HANDLE *collection_handle, int update_every) { + internal_fatal(!is_