diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2023-04-07 21:25:01 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-04-07 21:25:01 +0300 |
commit | 204dd9ae272445d13f308badb07e99675fa34892 (patch) | |
tree | f42e873c60219b5031dcfc3e076adb2398cdb3fe | |
parent | 61925baaf6e2448c641e8e71248a47f7a01c4efd (diff) |
Boost dbengine (#14832)
* configure extent cache size
* workers can now execute up to 10 jobs in a run, boosting query prep and extent reads
* fix dispatched and executing counters
* boost to the max
* increase libuv worker threads
* query prep always get more prio than extent reads; stop processing in batch when dbengine is queue is critical
* fix accounting of query prep
* inlining of time-grouping functions, to speed up queries with billions of points
* make switching based on a local const variable
* print one pending contexts loading message per iteration
* inlined store engine query API
* inlined storage engine data collection api
* inlined all storage engine query ops
* eliminate and inline data collection ops
* simplified query group-by
* more error handling
* optimized partial trimming of group-by queries
* preparative work to support multiple passes of group-by
* more preparative work to support multiple passes of group-by (accepts multiple group-by params)
* unified query timings
* unified query timings - weights endpoint
* query target is no longer a static thread variable - there is a list of cached query targets, each of which of freed every 1000 queries
* fix query memory accounting
* added summary.dimension[].pri and sorted summary.dimensions based on priority and then name
* limit max ACLK WEB response size to 30MB
* the response type should be text/plain
* more preparative work for multiple group-by passes
* create functions for generating group by keys, ids and names
* multiple group-by passes are now supported
* parse group-by options array also with an index
* implemented percentage-of-instance group by function
* family is now merged in multi-node contexts
* prevent uninitialized use
59 files changed, 3112 insertions, 2417 deletions
diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c index 6ad2c43b70..1d00cdca3e 100644 --- a/aclk/aclk_query.c +++ b/aclk/aclk_query.c @@ -5,6 +5,7 @@ #include "aclk_tx_msgs.h" #define WEB_HDR_ACCEPT_ENC "Accept-Encoding:" +#define ACLK_MAX_WEB_RESPONSE_SIZE (30 * 1024 * 1024) pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER; pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER; @@ -22,6 +23,13 @@ static usec_t aclk_web_api_request(RRDHOST *host, struct web_client *w, char *ur else w->response.code = web_client_api_request_v1(host, w, url); + if(buffer_strlen(w->response.data) > ACLK_MAX_WEB_RESPONSE_SIZE) { + buffer_flush(w->response.data); + buffer_strcat(w->response.data, "response is too big"); + w->response.data->content_type = CT_TEXT_PLAIN; + w->response.code = HTTP_RESP_CONTENT_TOO_LONG; + } + t = now_monotonic_high_precision_usec() - t; if (aclk_stats_enabled) { diff --git a/daemon/main.c b/daemon/main.c index 832a59b955..682106b78e 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -1834,7 +1834,7 @@ int main(int argc, char **argv) { #endif // set libuv worker threads - libuv_worker_threads = (int)get_netdata_cpus() * 2; + libuv_worker_threads = (int)get_netdata_cpus() * 6; if(libuv_worker_threads < MIN_LIBUV_WORKER_THREADS) libuv_worker_threads = MIN_LIBUV_WORKER_THREADS; diff --git a/daemon/service.c b/daemon/service.c index 9761abd02d..57c7c7f398 100644 --- a/daemon/service.c +++ b/daemon/service.c @@ -55,7 +55,7 @@ static void svc_rrddim_obsolete_to_archive(RRDDIM *rd) { if(rd->tiers[tier].db_collection_handle) { tiers_available++; - if(rd->tiers[tier].collect_ops->finalize(rd->tiers[tier].db_collection_handle)) + if(storage_engine_store_finalize(rd->tiers[tier].db_collection_handle)) tiers_said_no_retention++; rd->tiers[tier].db_collection_handle = NULL; diff --git a/daemon/unit_test.c b/daemon/unit_test.c index 4dd4b29931..fa3fa847f2 100644 --- a/daemon/unit_test.c +++ b/daemon/unit_test.c @@ -1937,7 +1937,7 @@ static time_t test_dbengine_create_metrics(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS // feed it with the test data for (i = 0 ; i < CHARTS ; ++i) { for (j = 0 ; j < DIMS ; ++j) { - rd[i][j]->tiers[0].collect_ops->change_collection_frequency(rd[i][j]->tiers[0].db_collection_handle, update_every); + storage_engine_store_change_collection_frequency(rd[i][j]->tiers[0].db_collection_handle, update_every); rd[i][j]->last_collected_time.tv_sec = st[i]->last_collected_time.tv_sec = st[i]->last_updated.tv_sec = time_now; @@ -1988,13 +1988,13 @@ static int test_dbengine_check_metrics(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DI time_now = time_start + (c + 1) * update_every; for (i = 0 ; i < CHARTS ; ++i) { for (j = 0; j < DIMS; ++j) { - rd[i][j]->tiers[0].query_ops->init(rd[i][j]->tiers[0].db_metric_handle, &handle, time_now, time_now + QUERY_BATCH * update_every, STORAGE_PRIORITY_NORMAL); + storage_engine_query_init(rd[i][j]->tiers[0].backend, rd[i][j]->tiers[0].db_metric_handle, &handle, time_now, time_now + QUERY_BATCH * update_every, STORAGE_PRIORITY_NORMAL); for (k = 0; k < QUERY_BATCH; ++k) { last = ((collected_number)i * DIMS) * REGION_POINTS[current_region] + j * REGION_POINTS[current_region] + c + k; expected = unpack_storage_number(pack_storage_number((NETDATA_DOUBLE)last, SN_DEFAULT_FLAGS)); - STORAGE_POINT sp = rd[i][j]->tiers[0].query_ops->next_metric(&handle); + STORAGE_POINT sp = storage_engine_query_next_metric(&handle); value = sp.sum; time_retrieved = sp.start_time_s; end_time = sp.end_time_s; @@ -2016,7 +2016,7 @@ static int test_dbengine_check_metrics(RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DI errors++; } } - rd[i][j]->tiers[0].query_ops->finalize(&handle); + storage_engine_query_finalize(&handle); } } } @@ -2444,13 +2444,13 @@ static void query_dbengine_chart(void *arg) time_before = MIN(time_after + duration, time_max); /* up to 1 hour queries */ } - rd->tiers[0].query_ops->init(rd->tiers[0].db_metric_handle, &handle, time_after, time_before, STORAGE_PRIORITY_NORMAL); + storage_engine_query_init(rd->tiers[0].backend, rd->tiers[0].db_metric_handle, &handle, time_after, time_before, STORAGE_PRIORITY_NORMAL); ++thread_info->queries_nr; for (time_now = time_after ; time_now <= time_before ; time_now += update_every) { generatedv = generate_dbengine_chart_value(i, j, time_now); expected = unpack_storage_number(pack_storage_number((NETDATA_DOUBLE) generatedv, SN_DEFAULT_FLAGS)); - if (unlikely(rd->tiers[0].query_ops->is_finished(&handle))) { + if (unlikely(storage_engine_query_is_finished(&handle))) { if (!thread_info->delete_old_data) { /* data validation only when we don't delete */ fprintf(stderr, " DB-engine stresstest %s/%s: at %lu secs, expecting value " NETDATA_DOUBLE_FORMAT ", found data gap, ### E R R O R ###\n", @@ -2460,7 +2460,7 @@ static void query_dbengine_chart(void *arg) break; } - STORAGE_POINT sp = rd->tiers[0].query_ops->next_metric(&handle); + STORAGE_POINT sp = storage_engine_query_next_metric(&handle); value = sp.sum; time_retrieved = sp.start_time_s; end_time = sp.end_time_s; @@ -2498,7 +2498,7 @@ static void query_dbengine_chart(void *arg) } } } - rd->tiers[0].query_ops->finalize(&handle); + storage_engine_query_finalize(&handle); } while(!thread_info->done); if(value_errors) diff --git a/database/contexts/api_v2.c b/database/contexts/api_v2.c index 6d5d104994..0d753b8789 100644 --- a/database/contexts/api_v2.c +++ b/database/contexts/api_v2.c @@ -112,6 +112,8 @@ struct rrdcontext_to_json_v2_data { SIMPLE_PATTERN *pattern; FTS_INDEX fts; } q; + + struct query_timings timings; }; static FTS_MATCH rrdcontext_to_json_v2_full_text_search(struct rrdcontext_to_json_v2_data *ctl, RRDCONTEXT *rc, SIMPLE_PATTERN *q) { @@ -194,7 +196,7 @@ static ssize_t rrdcontext_to_json_v2_add_context(void *data, RRDCONTEXT_ACQUIRED struct rrdcontext_to_json_v2_entry t = { .count = 0, .id = rc->id, - .family = rc->family, + .family = string_dup(rc->family), .priority = rc->priority, .first_time_s = rc->first_time_s, .last_time_s = rc->last_time_s, @@ -219,6 +221,10 @@ static ssize_t rrdcontext_to_json_v2_add_context(void *data, RRDCONTEXT_ACQUIRED if(z->last_time_s < rc->last_time_s) z->last_time_s = rc->last_time_s; + + if(z->family != rc->family) { + z->family = string_2way_merge(z->family, rc->family); + } } return 1; @@ -248,7 +254,7 @@ static ssize_t rrdcontext_to_json_v2_add_host(void *data, RRDHOST *host, bool qu struct rrdcontext_to_json_v2_data *ctl = data; BUFFER *wb = ctl->wb; - if(ctl->request->timeout_ms && now_monotonic_usec() > ctl->request->timings.received_ut + ctl->request->timeout_ms * USEC_PER_MS) + if(ctl->request->timeout_ms && now_monotonic_usec() > ctl->timings.received_ut + ctl->request->timeout_ms * USEC_PER_MS) // timed out return -2; @@ -384,7 +390,22 @@ static void buffer_json_contexts_v2_options_to_array(BUFFER *wb, CONTEXTS_V2_OPT buffer_json_add_array_item_string(wb, "search"); } -void buffer_json_agents_array_v2(BUFFER *wb, time_t now_s) { +void buffer_json_query_timings(BUFFER *wb, const char *key, struct query_timings *timings) { + timings->finished_ut = now_monotonic_usec(); + if(!timings->executed_ut) + timings->executed_ut = timings->finished_ut; + if(!timings->preprocessed_ut) + timings->preprocessed_ut = timings->received_ut; + buffer_json_member_add_object(wb, key); + buffer_json_member_add_double(wb, "prep_ms", (NETDATA_DOUBLE)(timings->preprocessed_ut - timings->received_ut) / USEC_PER_MS); + buffer_json_member_add_double(wb, "query_ms", (NETDATA_DOUBLE)(timings->executed_ut - timings->preprocessed_ut) / USEC_PER_MS); + buffer_json_member_add_double(wb, "output_ms", (NETDATA_DOUBLE)(timings->finished_ut - timings->executed_ut) / USEC_PER_MS); + buffer_json_member_add_double(wb, "total_ms", (NETDATA_DOUBLE)(timings->finished_ut - timings->received_ut) / USEC_PER_MS); + buffer_json_member_add_double(wb, "cloud_ms", (NETDATA_DOUBLE)(timings->finished_ut - timings->received_ut) / USEC_PER_MS); + buffer_json_object_close(wb); +} + +void buffer_json_agents_array_v2(BUFFER *wb, struct query_timings *timings, time_t now_s) { if(!now_s) now_s = now_realtime_sec(); @@ -395,15 +416,30 @@ void buffer_json_agents_array_v2(BUFFER *wb, time_t now_s) { buffer_json_member_add_string(wb, "nm", rrdhost_hostname(localhost)); buffer_json_member_add_time_t(wb, "now", now_s); buffer_json_member_add_uint64(wb, "ai", 0); + + if(timings) + buffer_json_query_timings(wb, "timings", timings); + buffer_json_object_close(wb); buffer_json_array_close(wb); } +void buffer_json_cloud_timings(BUFFER *wb, const char *key, struct query_timings *timings) { + buffer_json_member_add_object(wb, key); + buffer_json_member_add_double(wb, "routing_ms", 0.0); + buffer_json_member_add_double(wb, "node_max_ms", 0.0); + buffer_json_member_add_double(wb, "total_ms", (NETDATA_DOUBLE)(timings->finished_ut - timings->received_ut) / USEC_PER_MS); + buffer_json_object_close(wb); +} + +void contexts_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *data __maybe_unused) { + struct rrdcontext_to_json_v2_entry *z = value; + string_freez(z->family); +} + int rrdcontext_to_json_v2(BUFFER *wb, struct api_v2_contexts_request *req, CONTEXTS_V2_OPTIONS options) { int resp = HTTP_RESP_OK; - req->timings.processing_ut = now_monotonic_usec(); - if(options & CONTEXTS_V2_SEARCH) options |= CONTEXTS_V2_CONTEXTS; @@ -418,15 +454,22 @@ int rrdcontext_to_json_v2(BUFFER *wb, struct api_v2_contexts_request *req, CONTE .contexts.pattern = string_to_simple_pattern(req->contexts), .contexts.scope_pattern = string_to_simple_pattern(req->scope_contexts), .q.pattern = string_to_simple_pattern_nocase(req->q), + .timings = { + .received_ut = now_monotonic_usec(), + } }; - if(options & CONTEXTS_V2_CONTEXTS) - ctl.ctx = dictionary_create_advanced(DICT_OPTION_SINGLE_THREADED | DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE, NULL, sizeof(struct rrdcontext_to_json_v2_entry)); + if(options & CONTEXTS_V2_CONTEXTS) { + ctl.ctx = dictionary_create_advanced( + DICT_OPTION_SINGLE_THREADED | DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE, NULL, + sizeof(struct rrdcontext_to_json_v2_entry)); + + dictionary_register_delete_callback(ctl.ctx, contexts_delete_callback, NULL); + } time_t now_s = now_realtime_sec(); buffer_json_initialize(wb, "\"", "\"", 0, true, false); buffer_json_member_add_uint64(wb, "api", 2); - buffer_json_agents_array_v2(wb, now_s); if(options & CONTEXTS_V2_DEBUG) { buffer_json_member_add_object(wb, "request"); @@ -473,7 +516,7 @@ int rrdcontext_to_json_v2(BUFFER *wb, struct api_v2_contexts_request *req, CONTE if(options & (CONTEXTS_V2_NODES | CONTEXTS_V2_NODES_DETAILED | CONTEXTS_V2_DEBUG)) buffer_json_array_close(wb); - req->timings.output_ut = now_monotonic_usec(); + ctl.timings.executed_ut = now_monotonic_usec(); version_hashes_api_v2(wb, &ctl.versions); if(options & CONTEXTS_V2_CONTEXTS) { @@ -506,13 +549,8 @@ int rrdcontext_to_json_v2(BUFFER *wb, struct api_v2_contexts_request *req, CONTE buffer_json_object_close(wb); } - req->timings.finished_ut = now_monotonic_usec(); - buffer_json_member_add_object(wb, "timings"); - buffer_json_member_add_double(wb, "prep_ms", (NETDATA_DOUBLE)(req->timings.processing_ut - req->timings.received_ut) / USEC_PER_MS); - buffer_json_member_add_double(wb, "query_ms", (NETDATA_DOUBLE)(req->timings.output_ut - req->timings.processing_ut) / USEC_PER_MS); - buffer_json_member_add_double(wb, "output_ms", (NETDATA_DOUBLE)(req->timings.finished_ut - req->timings.output_ut) / USEC_PER_MS); - buffer_json_member_add_double(wb, "total_ms", (NETDATA_DOUBLE)(req->timings.finished_ut - req->timings.received_ut) / USEC_PER_MS); - buffer_json_object_close(wb); + buffer_json_agents_array_v2(wb, &ctl.timings, now_s); + buffer_json_cloud_timings(wb, "timings", &ctl.timings); buffer_json_finalize(wb); cleanup: diff --git a/database/contexts/query_target.c b/database/contexts/query_target.c index 0378f52017..19ab8caadd 100644 --- a/database/contexts/query_target.c +++ b/database/contexts/query_target.c @@ -12,42 +12,55 @@ static void query_instance_release(QUERY_INSTANCE *qi); static void query_context_release(QUERY_CONTEXT *qc); static void query_node_release(QUERY_NODE *qn); -static __thread QUERY_TARGET thread_query_target = {}; - -// ---------------------------------------------------------------------------- -// query API - -typedef struct query_target_locals { - time_t start_s; - - QUERY_TARGET *qt; +static __thread QUERY_TARGET *thread_qt = NULL; +static struct { + struct { + SPINLOCK spinlock; + size_t count; + QUERY_TARGET *base; + } available; - RRDSET *st; + struct { + SPINLOCK spinlock; + size_t count; + QUERY_TARGET *base; + } used; +} query_target_base = { + .available = { + .spinlock = NETDATA_SPINLOCK_INITIALIZER, + .base = NULL, + .count = 0, + }, + .used = { + .spinlock = NETDATA_SPINLOCK_INITIALIZER, + .base = NULL, + .count = 0, + }, +}; + +static void query_target_destroy(QUERY_TARGET *qt) { + __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->query.size * sizeof(*qt->query.array), __ATOMIC_RELAXED); + freez(qt->query.array); - const char *scope_nodes; - const char *scope_contexts; + __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->dimensions.size * sizeof(*qt->dimensions.array), __ATOMIC_RELAXED); + freez(qt->dimensions.array); - const char *nodes; - const char *contexts; - const char *instances; - const char *dimensions; - const char *chart_label_key; - const char *labels; - const char *alerts; + __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->instances.size * sizeof(*qt->instances.array), __ATOMIC_RELAXED); + freez(qt->instances.array); - long long after; - long long before; - bool match_ids; - bool match_names; + __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->contexts.size * sizeof(*qt->contexts.array), __ATOMIC_RELAXED); + freez(qt->contexts.array); - size_t metrics_skipped_due_to_not_matching_timeframe; + __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->nodes.size * sizeof(*qt->nodes.array), __ATOMIC_RELAXED); + freez(qt->nodes.array); - char host_uuid_buffer[UUID_STR_LEN]; - QUERY_NODE *qn; // temp to pass on callbacks, ignore otherwise - no need to free -} QUERY_TARGET_LOCALS; + freez(qt); +} void query_target_release(QUERY_TARGET *qt) { - if(unlikely(!qt || !qt->used)) return; + if(unlikely(!qt)) return; + + internal_fatal(!qt->internal.used, "QUERY TARGET: qt to be released is not used"); simple_pattern_free(qt->nodes.scope_pattern); qt->nodes.scope_pattern = NULL; @@ -113,44 +126,91 @@ void query_target_release(QUERY_TARGET *qt) { qt->db.first_time_s = 0; qt->db.last_time_s = 0; - qt->group_by.used = 0; + for(size_t g = 0; g < MAX_QUERY_GROUP_BY_PASSES ;g++) + qt->group_by[g].used = 0; qt->id[0] = '\0'; - qt->used = false; + netdata_spinlock_lock(&query_target_base.used.spinlock); + DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(query_target_base.used.base, qt, internal.prev, internal.next); + query_target_base.used.count--; + netdata_spinlock_unlock(&query_target_base.used.spinlock); + + qt->internal.used = false; + thread_qt = NULL; + + if (qt->internal.queries > 1000) { + query_target_destroy(qt); + } + else { + netdata_spinlock_lock(&query_target_base.available.spinlock); + DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(query_target_base.available.base, qt, internal.prev, internal.next); + query_target_base.available.count++; + netdata_spinlock_unlock(&query_target_base.available.spinlock); + } } -void query_target_free(void) { - QUERY_TARGET *qt = &thread_query_target; - if(qt->used) - query_target_release(qt); +static QUERY_TARGET *query_target_get(void) { + netdata_spinlock_lock(&query_target_base.available.spinlock); + QUERY_TARGET *qt = query_target_base.available.base; + if (qt) { + DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(query_target_base.available.base, qt, internal.prev, internal.next); + query_target_base.available.count--; + } + netdata_spinlock_unlock(&query_target_base.available.spinlock); - __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->query.size * sizeof(QUERY_METRIC), __ATOMIC_RELAXED); - freez(qt->query.array); - qt->query.array = NULL; - qt->query.size = 0; + if(unlikely(!qt)) + qt = callocz(1, sizeof(*qt)); - __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->dimensions.size * sizeof(RRDMETRIC_ACQUIRED *), __ATOMIC_RELAXED); - freez(qt->dimensions.array); - qt->dimensions.array = NULL; - qt->dimensions.size = 0; + netdata_spinlock_lock(&query_target_base.used.spinlock); + DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(query_target_base.used.base, qt, internal.prev, internal.next); + query_target_base.used.count++; + netdata_spinlock_unlock(&query_target_base.used.spinlock); - __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->instances.size * sizeof(RRDINSTANCE_ACQUIRED *), __ATOMIC_RELAXED); - freez(qt->instances.array); - qt->instances.array = NULL; - qt->instances.size = 0; + qt->internal.used = true; + qt->internal.queries++; + thread_qt = qt; - __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->contexts.size * sizeof(RRDCONTEXT_ACQUIRED *), __ATOMIC_RELAXED); - freez(qt->contexts.array); - qt->contexts.array = NULL; - qt->contexts.size = 0; + return qt; +} - __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->nodes.size * sizeof(RRDHOST *), __ATOMIC_RELAXED); - freez(qt->nodes.array); - qt->nodes.array = NULL; - qt->nodes.size = 0; +// this is used to release a query target from a cancelled thread |