diff options
Diffstat (limited to 'database')
-rw-r--r-- | database/contexts/api_v2.c | 70 | ||||
-rw-r--r-- | database/contexts/query_target.c | 201 | ||||
-rw-r--r-- | database/contexts/rrdcontext.h | 77 | ||||
-rw-r--r-- | database/engine/pagecache.c | 10 | ||||
-rw-r--r-- | database/engine/pagecache.h | 2 | ||||
-rw-r--r-- | database/engine/pdc.c | 6 | ||||
-rw-r--r-- | database/engine/rrdengine.c | 160 | ||||
-rw-r--r-- | database/engine/rrdengine.h | 9 | ||||
-rwxr-xr-x | database/engine/rrdengineapi.c | 3 | ||||
-rw-r--r-- | database/ram/rrddim_mem.c | 6 | ||||
-rw-r--r-- | database/ram/rrddim_mem.h | 4 | ||||
-rw-r--r-- | database/rrd.h | 258 | ||||
-rw-r--r-- | database/rrddim.c | 14 | ||||
-rw-r--r-- | database/rrdset.c | 23 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk_node.c | 7 | ||||
-rw-r--r-- | database/storage_engine.c | 54 |
16 files changed, 619 insertions, 285 deletions
diff --git a/database/contexts/api_v2.c b/database/contexts/api_v2.c index 6d5d104994..0d753b8789 100644 --- a/database/contexts/api_v2.c +++ b/database/contexts/api_v2.c @@ -112,6 +112,8 @@ struct rrdcontext_to_json_v2_data { SIMPLE_PATTERN *pattern; FTS_INDEX fts; } q; + + struct query_timings timings; }; static FTS_MATCH rrdcontext_to_json_v2_full_text_search(struct rrdcontext_to_json_v2_data *ctl, RRDCONTEXT *rc, SIMPLE_PATTERN *q) { @@ -194,7 +196,7 @@ static ssize_t rrdcontext_to_json_v2_add_context(void *data, RRDCONTEXT_ACQUIRED struct rrdcontext_to_json_v2_entry t = { .count = 0, .id = rc->id, - .family = rc->family, + .family = string_dup(rc->family), .priority = rc->priority, .first_time_s = rc->first_time_s, .last_time_s = rc->last_time_s, @@ -219,6 +221,10 @@ static ssize_t rrdcontext_to_json_v2_add_context(void *data, RRDCONTEXT_ACQUIRED if(z->last_time_s < rc->last_time_s) z->last_time_s = rc->last_time_s; + + if(z->family != rc->family) { + z->family = string_2way_merge(z->family, rc->family); + } } return 1; @@ -248,7 +254,7 @@ static ssize_t rrdcontext_to_json_v2_add_host(void *data, RRDHOST *host, bool qu struct rrdcontext_to_json_v2_data *ctl = data; BUFFER *wb = ctl->wb; - if(ctl->request->timeout_ms && now_monotonic_usec() > ctl->request->timings.received_ut + ctl->request->timeout_ms * USEC_PER_MS) + if(ctl->request->timeout_ms && now_monotonic_usec() > ctl->timings.received_ut + ctl->request->timeout_ms * USEC_PER_MS) // timed out return -2; @@ -384,7 +390,22 @@ static void buffer_json_contexts_v2_options_to_array(BUFFER *wb, CONTEXTS_V2_OPT buffer_json_add_array_item_string(wb, "search"); } -void buffer_json_agents_array_v2(BUFFER *wb, time_t now_s) { +void buffer_json_query_timings(BUFFER *wb, const char *key, struct query_timings *timings) { + timings->finished_ut = now_monotonic_usec(); + if(!timings->executed_ut) + timings->executed_ut = timings->finished_ut; + if(!timings->preprocessed_ut) + timings->preprocessed_ut = timings->received_ut; + buffer_json_member_add_object(wb, key); + buffer_json_member_add_double(wb, "prep_ms", (NETDATA_DOUBLE)(timings->preprocessed_ut - timings->received_ut) / USEC_PER_MS); + buffer_json_member_add_double(wb, "query_ms", (NETDATA_DOUBLE)(timings->executed_ut - timings->preprocessed_ut) / USEC_PER_MS); + buffer_json_member_add_double(wb, "output_ms", (NETDATA_DOUBLE)(timings->finished_ut - timings->executed_ut) / USEC_PER_MS); + buffer_json_member_add_double(wb, "total_ms", (NETDATA_DOUBLE)(timings->finished_ut - timings->received_ut) / USEC_PER_MS); + buffer_json_member_add_double(wb, "cloud_ms", (NETDATA_DOUBLE)(timings->finished_ut - timings->received_ut) / USEC_PER_MS); + buffer_json_object_close(wb); +} + +void buffer_json_agents_array_v2(BUFFER *wb, struct query_timings *timings, time_t now_s) { if(!now_s) now_s = now_realtime_sec(); @@ -395,15 +416,30 @@ void buffer_json_agents_array_v2(BUFFER *wb, time_t now_s) { buffer_json_member_add_string(wb, "nm", rrdhost_hostname(localhost)); buffer_json_member_add_time_t(wb, "now", now_s); buffer_json_member_add_uint64(wb, "ai", 0); + + if(timings) + buffer_json_query_timings(wb, "timings", timings); + buffer_json_object_close(wb); buffer_json_array_close(wb); } +void buffer_json_cloud_timings(BUFFER *wb, const char *key, struct query_timings *timings) { + buffer_json_member_add_object(wb, key); + buffer_json_member_add_double(wb, "routing_ms", 0.0); + buffer_json_member_add_double(wb, "node_max_ms", 0.0); + buffer_json_member_add_double(wb, "total_ms", (NETDATA_DOUBLE)(timings->finished_ut - timings->received_ut) / USEC_PER_MS); + buffer_json_object_close(wb); +} + +void contexts_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *data __maybe_unused) { + struct rrdcontext_to_json_v2_entry *z = value; + string_freez(z->family); +} + int rrdcontext_to_json_v2(BUFFER *wb, struct api_v2_contexts_request *req, CONTEXTS_V2_OPTIONS options) { int resp = HTTP_RESP_OK; - req->timings.processing_ut = now_monotonic_usec(); - if(options & CONTEXTS_V2_SEARCH) options |= CONTEXTS_V2_CONTEXTS; @@ -418,15 +454,22 @@ int rrdcontext_to_json_v2(BUFFER *wb, struct api_v2_contexts_request *req, CONTE .contexts.pattern = string_to_simple_pattern(req->contexts), .contexts.scope_pattern = string_to_simple_pattern(req->scope_contexts), .q.pattern = string_to_simple_pattern_nocase(req->q), + .timings = { + .received_ut = now_monotonic_usec(), + } }; - if(options & CONTEXTS_V2_CONTEXTS) - ctl.ctx = dictionary_create_advanced(DICT_OPTION_SINGLE_THREADED | DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE, NULL, sizeof(struct rrdcontext_to_json_v2_entry)); + if(options & CONTEXTS_V2_CONTEXTS) { + ctl.ctx = dictionary_create_advanced( + DICT_OPTION_SINGLE_THREADED | DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE, NULL, + sizeof(struct rrdcontext_to_json_v2_entry)); + + dictionary_register_delete_callback(ctl.ctx, contexts_delete_callback, NULL); + } time_t now_s = now_realtime_sec(); buffer_json_initialize(wb, "\"", "\"", 0, true, false); buffer_json_member_add_uint64(wb, "api", 2); - buffer_json_agents_array_v2(wb, now_s); if(options & CONTEXTS_V2_DEBUG) { buffer_json_member_add_object(wb, "request"); @@ -473,7 +516,7 @@ int rrdcontext_to_json_v2(BUFFER *wb, struct api_v2_contexts_request *req, CONTE if(options & (CONTEXTS_V2_NODES | CONTEXTS_V2_NODES_DETAILED | CONTEXTS_V2_DEBUG)) buffer_json_array_close(wb); - req->timings.output_ut = now_monotonic_usec(); + ctl.timings.executed_ut = now_monotonic_usec(); version_hashes_api_v2(wb, &ctl.versions); if(options & CONTEXTS_V2_CONTEXTS) { @@ -506,13 +549,8 @@ int rrdcontext_to_json_v2(BUFFER *wb, struct api_v2_contexts_request *req, CONTE buffer_json_object_close(wb); } - req->timings.finished_ut = now_monotonic_usec(); - buffer_json_member_add_object(wb, "timings"); - buffer_json_member_add_double(wb, "prep_ms", (NETDATA_DOUBLE)(req->timings.processing_ut - req->timings.received_ut) / USEC_PER_MS); - buffer_json_member_add_double(wb, "query_ms", (NETDATA_DOUBLE)(req->timings.output_ut - req->timings.processing_ut) / USEC_PER_MS); - buffer_json_member_add_double(wb, "output_ms", (NETDATA_DOUBLE)(req->timings.finished_ut - req->timings.output_ut) / USEC_PER_MS); - buffer_json_member_add_double(wb, "total_ms", (NETDATA_DOUBLE)(req->timings.finished_ut - req->timings.received_ut) / USEC_PER_MS); - buffer_json_object_close(wb); + buffer_json_agents_array_v2(wb, &ctl.timings, now_s); + buffer_json_cloud_timings(wb, "timings", &ctl.timings); buffer_json_finalize(wb); cleanup: diff --git a/database/contexts/query_target.c b/database/contexts/query_target.c index 0378f52017..19ab8caadd 100644 --- a/database/contexts/query_target.c +++ b/database/contexts/query_target.c @@ -12,42 +12,55 @@ static void query_instance_release(QUERY_INSTANCE *qi); static void query_context_release(QUERY_CONTEXT *qc); static void query_node_release(QUERY_NODE *qn); -static __thread QUERY_TARGET thread_query_target = {}; - -// ---------------------------------------------------------------------------- -// query API - -typedef struct query_target_locals { - time_t start_s; - - QUERY_TARGET *qt; +static __thread QUERY_TARGET *thread_qt = NULL; +static struct { + struct { + SPINLOCK spinlock; + size_t count; + QUERY_TARGET *base; + } available; - RRDSET *st; + struct { + SPINLOCK spinlock; + size_t count; + QUERY_TARGET *base; + } used; +} query_target_base = { + .available = { + .spinlock = NETDATA_SPINLOCK_INITIALIZER, + .base = NULL, + .count = 0, + }, + .used = { + .spinlock = NETDATA_SPINLOCK_INITIALIZER, + .base = NULL, + .count = 0, + }, +}; + +static void query_target_destroy(QUERY_TARGET *qt) { + __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->query.size * sizeof(*qt->query.array), __ATOMIC_RELAXED); + freez(qt->query.array); - const char *scope_nodes; - const char *scope_contexts; + __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->dimensions.size * sizeof(*qt->dimensions.array), __ATOMIC_RELAXED); + freez(qt->dimensions.array); - const char *nodes; - const char *contexts; - const char *instances; - const char *dimensions; - const char *chart_label_key; - const char *labels; - const char *alerts; + __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->instances.size * sizeof(*qt->instances.array), __ATOMIC_RELAXED); + freez(qt->instances.array); - long long after; - long long before; - bool match_ids; - bool match_names; + __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->contexts.size * sizeof(*qt->contexts.array), __ATOMIC_RELAXED); + freez(qt->contexts.array); - size_t metrics_skipped_due_to_not_matching_timeframe; + __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->nodes.size * sizeof(*qt->nodes.array), __ATOMIC_RELAXED); + freez(qt->nodes.array); - char host_uuid_buffer[UUID_STR_LEN]; - QUERY_NODE *qn; // temp to pass on callbacks, ignore otherwise - no need to free -} QUERY_TARGET_LOCALS; + freez(qt); +} void query_target_release(QUERY_TARGET *qt) { - if(unlikely(!qt || !qt->used)) return; + if(unlikely(!qt)) return; + + internal_fatal(!qt->internal.used, "QUERY TARGET: qt to be released is not used"); simple_pattern_free(qt->nodes.scope_pattern); qt->nodes.scope_pattern = NULL; @@ -113,44 +126,91 @@ void query_target_release(QUERY_TARGET *qt) { qt->db.first_time_s = 0; qt->db.last_time_s = 0; - qt->group_by.used = 0; + for(size_t g = 0; g < MAX_QUERY_GROUP_BY_PASSES ;g++) + qt->group_by[g].used = 0; qt->id[0] = '\0'; - qt->used = false; + netdata_spinlock_lock(&query_target_base.used.spinlock); + DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(query_target_base.used.base, qt, internal.prev, internal.next); + query_target_base.used.count--; + netdata_spinlock_unlock(&query_target_base.used.spinlock); + + qt->internal.used = false; + thread_qt = NULL; + + if (qt->internal.queries > 1000) { + query_target_destroy(qt); + } + else { + netdata_spinlock_lock(&query_target_base.available.spinlock); + DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(query_target_base.available.base, qt, internal.prev, internal.next); + query_target_base.available.count++; + netdata_spinlock_unlock(&query_target_base.available.spinlock); + } } -void query_target_free(void) { - QUERY_TARGET *qt = &thread_query_target; - if(qt->used) - query_target_release(qt); +static QUERY_TARGET *query_target_get(void) { + netdata_spinlock_lock(&query_target_base.available.spinlock); + QUERY_TARGET *qt = query_target_base.available.base; + if (qt) { + DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(query_target_base.available.base, qt, internal.prev, internal.next); + query_target_base.available.count--; + } + netdata_spinlock_unlock(&query_target_base.available.spinlock); - __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->query.size * sizeof(QUERY_METRIC), __ATOMIC_RELAXED); - freez(qt->query.array); - qt->query.array = NULL; - qt->query.size = 0; + if(unlikely(!qt)) + qt = callocz(1, sizeof(*qt)); - __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->dimensions.size * sizeof(RRDMETRIC_ACQUIRED *), __ATOMIC_RELAXED); - freez(qt->dimensions.array); - qt->dimensions.array = NULL; - qt->dimensions.size = 0; + netdata_spinlock_lock(&query_target_base.used.spinlock); + DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(query_target_base.used.base, qt, internal.prev, internal.next); + query_target_base.used.count++; + netdata_spinlock_unlock(&query_target_base.used.spinlock); - __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->instances.size * sizeof(RRDINSTANCE_ACQUIRED *), __ATOMIC_RELAXED); - freez(qt->instances.array); - qt->instances.array = NULL; - qt->instances.size = 0; + qt->internal.used = true; + qt->internal.queries++; + thread_qt = qt; - __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->contexts.size * sizeof(RRDCONTEXT_ACQUIRED *), __ATOMIC_RELAXED); - freez(qt->contexts.array); - qt->contexts.array = NULL; - qt->contexts.size = 0; + return qt; +} - __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->nodes.size * sizeof(RRDHOST *), __ATOMIC_RELAXED); - freez(qt->nodes.array); - qt->nodes.array = NULL; - qt->nodes.size = 0; +// this is used to release a query target from a cancelled thread +void query_target_free(void) { + query_target_release(thread_qt); } +// ---------------------------------------------------------------------------- +// query API + +typedef struct query_target_locals { + time_t start_s; + + QUERY_TARGET *qt; + + RRDSET *st; + + const char *scope_nodes; + const char *scope_contexts; + + const char *nodes; + const char *contexts; + const char *instances; + const char *dimensions; + const char *chart_label_key; + const char *labels; + const char *alerts; + + long long after; + long long before; + bool match_ids; + bool match_names; + + size_t metrics_skipped_due_to_not_matching_timeframe; + + char host_uuid_buffer[UUID_STR_LEN]; + QUERY_NODE *qn; // temp to pass on callbacks, ignore otherwise - no need to free +} QUERY_TARGET_LOCALS; + struct storage_engine *query_metric_storage_engine(QUERY_TARGET *qt, QUERY_METRIC *qm, size_t tier) { QUERY_NODE *qn = query_node(qt, qm->link.query_node_id); return qn->rrdhost->db[tier].eng; @@ -198,8 +258,8 @@ static bool query_metric_add(QUERY_TARGET_LOCALS *qtl, QUERY_NODE *qn, QUERY_CON tier_retention[tier].db_metric_handle = eng->api.metric_get(qn->rrdhost->db[tier].instance, &rm->uuid); if(tier_retention[tier].db_metric_handle) { - tier_retention[tier].db_first_time_s = tier_retention[tier].eng->api.query_ops.oldest_time_s(tier_retention[tier].db_metric_handle); - tier_retention[tier].db_last_time_s = tier_retention[tier].eng->api.query_ops.latest_time_s(tier_retention[tier].db_metric_handle); + tier_retention[tier].db_first_time_s = storage_engine_oldest_time_s(tier_retention[tier].eng->backend, tier_retention[tier].db_metric_handle); + tier_retention[tier].db_last_time_s = storage_engine_latest_time_s(tier_retention[tier].eng->backend, tier_retention[tier].db_metric_handle); if(!common_first_time_s) common_first_time_s = tier_retention[tier].db_first_time_s; @@ -301,7 +361,7 @@ static inline void query_dimension_release(QUERY_DIMENSION *qd) { qd->rma = NULL; } -static QUERY_DIMENSION *query_dimension_allocate(QUERY_TARGET *qt, RRDMETRIC_ACQUIRED *rma, QUERY_STATUS status) { +static QUERY_DIMENSION *query_dimension_allocate(QUERY_TARGET *qt, RRDMETRIC_ACQUIRED *rma, QUERY_STATUS status, size_t priority) { if(qt->dimensions.used == qt->dimensions.size) { size_t old_mem = qt->dimensions.size * sizeof(*qt->dimensions.array); qt->dimensions.size = query_target_realloc_size(qt->dimensions.size, 4); @@ -316,12 +376,13 @@ static QUERY_DIMENSION *query_dimension_allocate(QUERY_TARGET *qt, RRDMETRIC_ACQ qd->slot = qt->dimensions.used++; qd->rma = rrdmetric_acquired_dup(rma); qd->status = status; + qd->priority = priority; return qd; } static bool query_dimension_add(QUERY_TARGET_LOCALS *qtl, QUERY_NODE *qn, QUERY_CONTEXT *qc, QUERY_INSTANCE *qi, - RRDMETRIC_ACQUIRED *rma, bool queryable_instance, size_t *metrics_added) { + RRDMETRIC_ACQUIRED *rma, bool queryable_instance, size_t *metrics_added, size_t priority) { QUERY_TARGET *qt = qtl->qt; RRDMETRIC *rm = rrdmetric_acquired_value(rma); @@ -364,7 +425,7 @@ static bool query_dimension_add(QUERY_TARGET_LOCALS *qtl, QUERY_NODE *qn, QUERY_ // the user selection does not match this dimension // but, we may still need to query it - if (qt->request.options & RRDR_OPTION_PERCENTAGE) { + if (query_target_needs_all_dimensions(qt)) { // this is percentage calculation // so, we need this dimension to calculate the percentage needed = true; @@ -389,7 +450,7 @@ static bool query_dimension_add(QUERY_TARGET_LOCALS *qtl, QUERY_NODE *qn, QUERY_ status |= QUERY_STATUS_DIMENSION_HIDDEN; options |= RRDR_DIMENSION_HIDDEN; - if (qt->request.options & RRDR_OPTION_PERCENTAGE) { + if (query_target_needs_all_dimensions(qt)) { // this is percentage calculation // so, we need this dimension to calculate the percentage needed = true; @@ -430,7 +491,7 @@ static bool query_dimension_add(QUERY_TARGET_LOCALS *qtl, QUERY_NODE *qn, QUERY_ if(undo) return false; - query_dimension_allocate(qt, rma, status); + query_dimension_allocate(qt, rma, status, priority); return true; } @@ -694,17 +755,17 @@ static bool query_instance_add(QUERY_TARGET_LOCALS *qtl, QUERY_NODE *qn, QUERY_C if(queryable_instance && qt->request.version >= 2) query_target_eval_instance_rrdcalc(qtl, qn, qc, qi); - size_t dimensions_added = 0, metrics_added = 0; + size_t dimensions_added = 0, metrics_added = 0, priority = 0; if(unlikely(qt->request.rma)) { - if(query_dimension_add(qtl, qn, qc, qi, qt->request.rma, queryable_instance, &metrics_added)) + if(query_dimension_add(qtl, qn, qc, qi, qt->request.rma, queryable_instance, &metrics_added, priority++)) dimensions_added++; } else { RRDMETRIC *rm; dfe_start_read(ri->rrdmetrics, rm) { if(query_dimension_add(qtl, qn, qc, qi, (RRDMETRIC_ACQUIRED *) rm_dfe.item, - queryable_instance, &metrics_added)) + queryable_instance, &metrics_added, priority++)) dimensions_added++; } dfe_done(rm); @@ -957,13 +1018,7 @@ QUERY_TARGET *query_target_create(QUERY_TARGET_REQUEST *qtr) { if(!service_running(ABILITY_DATA_QUERIES)) return NULL; - QUERY_TARGET *qt = &thread_query_target; - - if(qt->used) - fatal("QUERY TARGET: this query target is already used (%zu queries made with this QUERY_TARGET so far).", qt->queries); - - qt->used = true; - qt->queries++; + QUERY_TARGET *qt = query_target_get(); if(!qtr->received_ut) qtr->received_ut = now_monotonic_usec(); @@ -985,7 +1040,11 @@ QUERY_TARGET *query_target_create(QUERY_TARGET_REQUEST *qtr) { query_target_generate_name(qt); qt->window.after = qt->request.after; qt->window.before = qt->request.before; + qt->window.options = qt->request.options; + if(query_target_has_percentage_of_instance(qt)) + qt->window.options &= ~RRDR_OPTION_PERCENTAGE; + rrdr_relative_window_to_absolute(&qt->window.after, &qt->window.before, &qt->window.now); // prepare our local variables - we need these across all these functions diff --git a/database/contexts/rrdcontext.h b/database/contexts/rrdcontext.h index f42c43135b..5093e1d614 100644 --- a/database/contexts/rrdcontext.h +++ b/database/contexts/rrdcontext.h @@ -201,6 +201,7 @@ typedef struct query_instance { typedef struct query_dimension { uint32_t slot; + uint32_t priority; RRDMETRIC_ACQUIRED *rma; QUERY_STATUS status; } QUERY_DIMENSION; @@ -231,7 +232,8 @@ typedef struct query_metric { STORAGE_POINT query_points; struct { - size_t slot; + uint32_t slot; + uint32_t first_slot; STRING *id; STRING *name; STRING *units; @@ -241,9 +243,16 @@ typedef struct query_metric { } QUERY_METRIC; #define MAX_QUERY_TARGET_ID_LENGTH 255 +#define MAX_QUERY_GROUP_BY_PASSES 2 typedef bool (*qt_interrupt_callback_t)(void *data); +struct group_by_pass { + RRDR_GROUP_BY group_by; + char *group_by_label; + RRDR_GROUP_BY_FUNCTION aggregation; +}; + typedef struct query_target_request { size_t version; @@ -284,9 +293,7 @@ typedef struct query_target_request { const char *time_group_options; // group by across multiple time-series - RRDR_GROUP_BY group_by; - char *group_by_label; - RRDR_GROUP_BY_FUNCTION group_by_aggregate_function; + struct group_by_pass group_by[MAX_QUERY_GROUP_BY_PASSES]; usec_t received_ut; @@ -313,15 +320,19 @@ struct query_versions { uint64_t alerts_soft_hash; }; +struct query_timings { + usec_t received_ut; + usec_t preprocessed_ut; + usec_t executed_ut; + usec_t finished_ut; +}; + #define query_view_update_every(qt) ((qt)->window.group * (qt)->window.query_granularity) typedef struct query_target { char id[MAX_QUERY_TARGET_ID_LENGTH + 1]; // query identifier (for logging) QUERY_TARGET_REQUEST request; - bool used; // when true, this query is currently being used - size_t queries; // how many query we have done so far with this QUERY_TARGET - not related to database queries - struct { time_t now; // the current timestamp, the absolute max for any query timestamp bool relative; // true when the request made with relative timestamps, true if it was absolute @@ -388,19 +399,20 @@ typedef struct query_target { struct { size_t used; - char *label_keys[GROUP_BY_MAX_LABEL_KEYS]; - } group_by; + char *label_keys[GROUP_BY_MAX_LABEL_KEYS * MAX_QUERY_GROUP_BY_PASSES]; + } group_by[MAX_QUERY_GROUP_BY_PASSES]; STORAGE_POINT query_points; - struct query_versions versions; + struct query_timings timings; struct { - usec_t received_ut; - usec_t preprocessed_ut; - usec_t executed_ut; - usec_t finished_ut; - } timings; + SPINLOCK spinlock; + bool used; // when true, this query is currently being used + size_t queries; // how many query we have done so far with this QUERY_TARGET - not related to database queries + struct query_target *prev; + struct query_target *next; + } internal; } QUERY_TARGET; static inline NEVERNULL QUERY_NODE *query_node(QUERY_TARGET *qt, size_t id) { @@ -455,13 +467,6 @@ struct api_v2_contexts_request { char *contexts; char *q; - struct { - usec_t received_ut; - usec_t processing_ut; - usec_t output_ut; - usec_t finished_ut; - } timings; - time_t timeout_ms; qt_interrupt_callback_t interrupt_callback; @@ -479,8 +484,10 @@ typedef enum __attribute__ ((__packed__)) { int rrdcontext_to_json_v2(BUFFER *wb, struct api_v2_contexts_request *req, CONTEXTS_V2_OPTIONS options); RRDCONTEXT_TO_JSON_OPTIONS rrdcontext_to_json_parse_options(char *o); -void buffer_json_agents_array_v2(BUFFER *wb, time_t now_s); +void buffer_json_agents_array_v2(BUFFER *wb, struct query_timings *timings, time_t now_s); void buffer_json_node_add_v2(BUFFER *wb, RRDHOST *host, size_t ni, usec_t duration_ut); +void buffer_json_query_timings(BUFFER *wb, const char *key, struct query_timings *timings); +void buffer_json_cloud_timings(BUFFER *wb, const char *key, struct query_timings *timings); // ---------------------------------------------------------------------------- // scope @@ -515,5 +522,29 @@ bool rrdcontext_retention_match(RRDCONTEXT_ACQUIRED *rca, time_t after, time_t b (((first_entry_s) - ((update_every_s) * 2) <= (before)) && \ ((last_entry_s) + ((update_every_s) * 2) >= (after))) +#define query_target_aggregatable(qt) ((qt)->window.options & RRDR_OPTION_RETURN_RAW) + +static inline bool query_target_has_percentage_of_instance(QUERY_TARGET *qt) { + for(size_t g = 0; g < MAX_QUERY_GROUP_BY_PASSES ;g++) + if(qt->request.group_by[g].group_by & RRDR_GROUP_BY_PERCENTAGE_OF_INSTANCE) + return true; + + return false; +} + +static inline bool query_target_needs_all_dimensions(QUERY_TARGET *qt) { + if(qt->request.options & RRDR_OPTION_PERCENTAGE) + return true; + + return query_target_has_percentage_of_instance(qt); +} + +static inline bool query_target_has_percentage_units(QUERY_TARGET *qt) { + if(qt->window.time_group_method == RRDR_GROUPING_CV || query_target_needs_all_dimensions(qt)) + return true; + + return false; +} + #endif // NETDATA_RRDCONTEXT_H diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c index 9606632ab5..02d08a1647 100644 --- a/database/engine/pagecache.c +++ b/database/engine/pagecache.c @@ -769,7 +769,10 @@ inline void rrdeng_prep_wait(PDC *pdc) { } } -void rrdeng_prep_query(PDC *pdc) { +void rrdeng_prep_query(struct page_details_control *pdc, bool worker) { + if(worker) + worker_is_busy(UV_EVENT_DBENGINE_QUERY); + size_t pages_to_load = 0; pdc->page_list_JudyL = get_page_list(pdc->ctx, pdc->metric, pdc->start_time_s * USEC_PER_SEC, @@ -792,6 +795,9 @@ void rrdeng_prep_query(PDC *pdc) { completion_mark_complete(&pdc->prep_completion); pdc_release_and_destroy_if_unreferenced(pdc, true, true); + + if(worker) + worker_is_idle(); } /** @@ -824,7 +830,7 @@ void pg_cache_preload(struct rrdeng_query_handle *handle) { handle->pdc->refcount++; // we get 1 for the query thread and 1 for the prep thread if(unlikely(handle->pdc->priority == STORAGE_PRIORITY_SYNCHRONOUS)) - rrdeng_prep_query(handle->pdc); + rrdeng_prep_query(handle->pdc, false); else rrdeng_enq_cmd(handle->ctx, RRDENG_OPCODE_QUERY, handle->pdc, NULL, handle->priority, NULL, NULL); } diff --git a/database/engine/pagecache.h b/database/engine/pagecache.h index ac7202f440..5242db89e5 100644 --- a/database/engine/pagecache.h +++ b/database/engine/pagecache.h @@ -52,7 +52,7 @@ struct rrdeng_query_handle; struct page_details_control; void rrdeng_prep_wait(struct page_details_control *pdc); -void rrdeng_prep_query(struct page_details_control *pdc); +void rrdeng_prep_query(struct page_details_control *pdc, bool worker); void pg_cache_preload(struct rrdeng_query_handle *handle); struct pgc_page *pg_cache_lookup_next(struct rrdengine_instance *ctx, struct page_details_control *pdc, time_t now_s, time_t last_update_every_s, size_t *entries); void pgc_and_mrg_initialize(void); diff --git a/database/engine/pdc.c b/database/engine/pdc.c index a29122cf54..42fb2f6de5 100644 --- a/database/engine/pdc.c +++ b/database/engine/pdc.c @@ -1151,6 +1151,9 @@ static inline void datafile_extent_read_free(void *buffer) { } void epdl_find_extent_and_populate_pages(struct rrdengine_instance *ctx, EPDL *epdl, bool worker) { + if(worker) + worker_is_busy(UV_EVENT_DBENGINE_EXTENT_CACHE_LOOKUP); + size_t *statistics_counter = NULL; PDC_PAGE_STATUS not_loaded_pages_tag = 0, loaded_pages_tag = 0; @@ -1173,9 +1176,6 @@ void epdl_find_extent_and_populate_pages(struct rrdengine_instance *ctx, EPDL *e goto cleanup; } - if(worker) - worker_is_busy(UV_EVENT_DBENGINE_EXTENT_CACHE_LOOKUP); - bool extent_found_in_cache = false; void *extent_compressed_data = NULL; diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c index fae6656d93..7811a5eaaf 100644 --- a/database/engine/rrdengine.c +++ b/database/engine/rrdengine.c @@ -16,6 +16,24 @@ unsigned rrdeng_pages_per_extent = MAX_PAGES_PER_EXTENT; #error Please increase WORKER_UTILIZATION_MAX_JOB_TYPES to at least (RRDENG_MAX_OPCODE + 2) #endif +struct rrdeng_cmd { + struct rrdengine_instance *ctx; + enum rrdeng_opcode opcode; + void *data; + struct completion *completion; + enum storage_priority priority; + dequeue_callback_t dequeue_cb; + + struct { + struct rrdeng_cmd *prev; + struct rrdeng_cmd *next; + } queue; +}; + +static inline struct rrdeng_cmd rrdeng_deq_cmd(bool from_worker); +static inline void worker_dispatch_extent_read(struct rrdeng_cmd cmd, bool from_worker); +static inline void worker_dispatch_query_prep(struct rrdeng_cmd cmd, bool from_worker); + struct rrdeng_main { uv_thread_t thread; uv_loop_t loop; @@ -45,7 +63,6 @@ struct rrdeng_main { struct { size_t dispatched; size_t executing; - size_t pending_cb; } atomics; } work_cmd; @@ -132,8 +149,22 @@ static void work_request_init(void) { ); } -static inline bool work_request_full(void) { - return __atomic_load_n(&rrdeng_main.work_cmd.atomics.dispatched, __ATOMIC_RELAXED) >= (size_t)(libuv_worker_threads - RESERVED_LIBUV_WORKER_THREADS); +enum LIBUV_WORKERS_STATUS { + LIBUV_WORKERS_RELAXED, + LIBUV_WORKERS_STRESSED, + LIBUV_WORKERS_CRITICAL, +}; + +static inline enum LIBUV_WORKERS_STATUS work_request_full(void) { + size_t dispatched = __atomic_load_n(&rrdeng_main.work_cmd.atomics.dispatched, __ATOMIC_RELAXED); + + if(dispatched >= (size_t)(libuv_worker_threads)) + return LIBUV_WORKERS_CRITICAL; + + else if(dispatched >= (size_t)(libuv_worker_threads - RESERVED_LIBUV_WORKER_THREADS)) + return LIBUV_WORKERS_STRESSED; + + return LIBUV_WORKERS_RELAXED; } static inline void work_done(struct rrdeng_work *work_request) { @@ -147,12 +178,38 @@ static void work_standard_worker(uv_work_t *req) { worker_is_busy(UV_EVENT_WORKER_INIT); struct rrdeng_work *work_request = req->data; + work_request->data = work_request->work_cb(work_request->ctx, work_request->data, work_request->completion, req); worker_is_idle(); + if(work_request->opcode == RRDENG_OPCODE_EXTENT_READ || work_request->opcode == RRDENG_OPCODE_QUERY) { + internal_fatal(work_request->after_work_cb != NULL, "DBENGINE: opcodes with a callback should not boosted"); + + while(1) { + struct rrdeng_cmd cmd = rrdeng_deq_cmd(true); + if (cmd.opcode == RRDENG_OPCODE_NOOP) + break; + + worker_is_busy(UV_EVENT_WORKER_INIT); + switch (cmd.opcode) { + case RRDENG_OPCODE_EXTENT_READ: + worker_dispatch_extent_read(cmd, true); + break; + + case RRDENG_OPCODE_QUERY: + worker_dispatch_query_prep(cmd, true); |