diff options
Diffstat (limited to 'database/contexts/query_target.c')
-rw-r--r-- | database/contexts/query_target.c | 201 |
1 files changed, 130 insertions, 71 deletions
diff --git a/database/contexts/query_target.c b/database/contexts/query_target.c index 0378f52017..19ab8caadd 100644 --- a/database/contexts/query_target.c +++ b/database/contexts/query_target.c @@ -12,42 +12,55 @@ static void query_instance_release(QUERY_INSTANCE *qi); static void query_context_release(QUERY_CONTEXT *qc); static void query_node_release(QUERY_NODE *qn); -static __thread QUERY_TARGET thread_query_target = {}; - -// ---------------------------------------------------------------------------- -// query API - -typedef struct query_target_locals { - time_t start_s; - - QUERY_TARGET *qt; +static __thread QUERY_TARGET *thread_qt = NULL; +static struct { + struct { + SPINLOCK spinlock; + size_t count; + QUERY_TARGET *base; + } available; - RRDSET *st; + struct { + SPINLOCK spinlock; + size_t count; + QUERY_TARGET *base; + } used; +} query_target_base = { + .available = { + .spinlock = NETDATA_SPINLOCK_INITIALIZER, + .base = NULL, + .count = 0, + }, + .used = { + .spinlock = NETDATA_SPINLOCK_INITIALIZER, + .base = NULL, + .count = 0, + }, +}; + +static void query_target_destroy(QUERY_TARGET *qt) { + __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->query.size * sizeof(*qt->query.array), __ATOMIC_RELAXED); + freez(qt->query.array); - const char *scope_nodes; - const char *scope_contexts; + __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->dimensions.size * sizeof(*qt->dimensions.array), __ATOMIC_RELAXED); + freez(qt->dimensions.array); - const char *nodes; - const char *contexts; - const char *instances; - const char *dimensions; - const char *chart_label_key; - const char *labels; - const char *alerts; + __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->instances.size * sizeof(*qt->instances.array), __ATOMIC_RELAXED); + freez(qt->instances.array); - long long after; - long long before; - bool match_ids; - bool match_names; + __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->contexts.size * sizeof(*qt->contexts.array), __ATOMIC_RELAXED); + freez(qt->contexts.array); - size_t metrics_skipped_due_to_not_matching_timeframe; + __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->nodes.size * sizeof(*qt->nodes.array), __ATOMIC_RELAXED); + freez(qt->nodes.array); - char host_uuid_buffer[UUID_STR_LEN]; - QUERY_NODE *qn; // temp to pass on callbacks, ignore otherwise - no need to free -} QUERY_TARGET_LOCALS; + freez(qt); +} void query_target_release(QUERY_TARGET *qt) { - if(unlikely(!qt || !qt->used)) return; + if(unlikely(!qt)) return; + + internal_fatal(!qt->internal.used, "QUERY TARGET: qt to be released is not used"); simple_pattern_free(qt->nodes.scope_pattern); qt->nodes.scope_pattern = NULL; @@ -113,44 +126,91 @@ void query_target_release(QUERY_TARGET *qt) { qt->db.first_time_s = 0; qt->db.last_time_s = 0; - qt->group_by.used = 0; + for(size_t g = 0; g < MAX_QUERY_GROUP_BY_PASSES ;g++) + qt->group_by[g].used = 0; qt->id[0] = '\0'; - qt->used = false; + netdata_spinlock_lock(&query_target_base.used.spinlock); + DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(query_target_base.used.base, qt, internal.prev, internal.next); + query_target_base.used.count--; + netdata_spinlock_unlock(&query_target_base.used.spinlock); + + qt->internal.used = false; + thread_qt = NULL; + + if (qt->internal.queries > 1000) { + query_target_destroy(qt); + } + else { + netdata_spinlock_lock(&query_target_base.available.spinlock); + DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(query_target_base.available.base, qt, internal.prev, internal.next); + query_target_base.available.count++; + netdata_spinlock_unlock(&query_target_base.available.spinlock); + } } -void query_target_free(void) { - QUERY_TARGET *qt = &thread_query_target; - if(qt->used) - query_target_release(qt); +static QUERY_TARGET *query_target_get(void) { + netdata_spinlock_lock(&query_target_base.available.spinlock); + QUERY_TARGET *qt = query_target_base.available.base; + if (qt) { + DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(query_target_base.available.base, qt, internal.prev, internal.next); + query_target_base.available.count--; + } + netdata_spinlock_unlock(&query_target_base.available.spinlock); - __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->query.size * sizeof(QUERY_METRIC), __ATOMIC_RELAXED); - freez(qt->query.array); - qt->query.array = NULL; - qt->query.size = 0; + if(unlikely(!qt)) + qt = callocz(1, sizeof(*qt)); - __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->dimensions.size * sizeof(RRDMETRIC_ACQUIRED *), __ATOMIC_RELAXED); - freez(qt->dimensions.array); - qt->dimensions.array = NULL; - qt->dimensions.size = 0; + netdata_spinlock_lock(&query_target_base.used.spinlock); + DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(query_target_base.used.base, qt, internal.prev, internal.next); + query_target_base.used.count++; + netdata_spinlock_unlock(&query_target_base.used.spinlock); - __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->instances.size * sizeof(RRDINSTANCE_ACQUIRED *), __ATOMIC_RELAXED); - freez(qt->instances.array); - qt->instances.array = NULL; - qt->instances.size = 0; + qt->internal.used = true; + qt->internal.queries++; + thread_qt = qt; - __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->contexts.size * sizeof(RRDCONTEXT_ACQUIRED *), __ATOMIC_RELAXED); - freez(qt->contexts.array); - qt->contexts.array = NULL; - qt->contexts.size = 0; + return qt; +} - __atomic_sub_fetch(&netdata_buffers_statistics.query_targets_size, qt->nodes.size * sizeof(RRDHOST *), __ATOMIC_RELAXED); - freez(qt->nodes.array); - qt->nodes.array = NULL; - qt->nodes.size = 0; +// this is used to release a query target from a cancelled thread +void query_target_free(void) { + query_target_release(thread_qt); } +// ---------------------------------------------------------------------------- +// query API + +typedef struct query_target_locals { + time_t start_s; + + QUERY_TARGET *qt; + + RRDSET *st; + + const char *scope_nodes; + const char *scope_contexts; + + const char *nodes; + const char *contexts; + const char *instances; + const char *dimensions; + const char *chart_label_key; + const char *labels; + const char *alerts; + + long long after; + long long before; + bool match_ids; + bool match_names; + + size_t metrics_skipped_due_to_not_matching_timeframe; + + char host_uuid_buffer[UUID_STR_LEN]; + QUERY_NODE *qn; // temp to pass on callbacks, ignore otherwise - no need to free +} QUERY_TARGET_LOCALS; + struct storage_engine *query_metric_storage_engine(QUERY_TARGET *qt, QUERY_METRIC *qm, size_t tier) { QUERY_NODE *qn = query_node(qt, qm->link.query_node_id); return qn->rrdhost->db[tier].eng; @@ -198,8 +258,8 @@ static bool query_metric_add(QUERY_TARGET_LOCALS *qtl, QUERY_NODE *qn, QUERY_CON tier_retention[tier].db_metric_handle = eng->api.metric_get(qn->rrdhost->db[tier].instance, &rm->uuid); if(tier_retention[tier].db_metric_handle) { - tier_retention[tier].db_first_time_s = tier_retention[tier].eng->api.query_ops.oldest_time_s(tier_retention[tier].db_metric_handle); - tier_retention[tier].db_last_time_s = tier_retention[tier].eng->api.query_ops.latest_time_s(tier_retention[tier].db_metric_handle); + tier_retention[tier].db_first_time_s = storage_engine_oldest_time_s(tier_retention[tier].eng->backend, tier_retention[tier].db_metric_handle); + tier_retention[tier].db_last_time_s = storage_engine_latest_time_s(tier_retention[tier].eng->backend, tier_retention[tier].db_metric_handle); if(!common_first_time_s) common_first_time_s = tier_retention[tier].db_first_time_s; @@ -301,7 +361,7 @@ static inline void query_dimension_release(QUERY_DIMENSION *qd) { qd->rma = NULL; } -static QUERY_DIMENSION *query_dimension_allocate(QUERY_TARGET *qt, RRDMETRIC_ACQUIRED *rma, QUERY_STATUS status) { +static QUERY_DIMENSION *query_dimension_allocate(QUERY_TARGET *qt, RRDMETRIC_ACQUIRED *rma, QUERY_STATUS status, size_t priority) { if(qt->dimensions.used == qt->dimensions.size) { size_t old_mem = qt->dimensions.size * sizeof(*qt->dimensions.array); qt->dimensions.size = query_target_realloc_size(qt->dimensions.size, 4); @@ -316,12 +376,13 @@ static QUERY_DIMENSION *query_dimension_allocate(QUERY_TARGET *qt, RRDMETRIC_ACQ qd->slot = qt->dimensions.used++; qd->rma = rrdmetric_acquired_dup(rma); qd->status = status; + qd->priority = priority; return qd; } static bool query_dimension_add(QUERY_TARGET_LOCALS *qtl, QUERY_NODE *qn, QUERY_CONTEXT *qc, QUERY_INSTANCE *qi, - RRDMETRIC_ACQUIRED *rma, bool queryable_instance, size_t *metrics_added) { + RRDMETRIC_ACQUIRED *rma, bool queryable_instance, size_t *metrics_added, size_t priority) { QUERY_TARGET *qt = qtl->qt; RRDMETRIC *rm = rrdmetric_acquired_value(rma); @@ -364,7 +425,7 @@ static bool query_dimension_add(QUERY_TARGET_LOCALS *qtl, QUERY_NODE *qn, QUERY_ // the user selection does not match this dimension // but, we may still need to query it - if (qt->request.options & RRDR_OPTION_PERCENTAGE) { + if (query_target_needs_all_dimensions(qt)) { // this is percentage calculation // so, we need this dimension to calculate the percentage needed = true; @@ -389,7 +450,7 @@ static bool query_dimension_add(QUERY_TARGET_LOCALS *qtl, QUERY_NODE *qn, QUERY_ status |= QUERY_STATUS_DIMENSION_HIDDEN; options |= RRDR_DIMENSION_HIDDEN; - if (qt->request.options & RRDR_OPTION_PERCENTAGE) { + if (query_target_needs_all_dimensions(qt)) { // this is percentage calculation // so, we need this dimension to calculate the percentage needed = true; @@ -430,7 +491,7 @@ static bool query_dimension_add(QUERY_TARGET_LOCALS *qtl, QUERY_NODE *qn, QUERY_ if(undo) return false; - query_dimension_allocate(qt, rma, status); + query_dimension_allocate(qt, rma, status, priority); return true; } @@ -694,17 +755,17 @@ static bool query_instance_add(QUERY_TARGET_LOCALS *qtl, QUERY_NODE *qn, QUERY_C if(queryable_instance && qt->request.version >= 2) query_target_eval_instance_rrdcalc(qtl, qn, qc, qi); - size_t dimensions_added = 0, metrics_added = 0; + size_t dimensions_added = 0, metrics_added = 0, priority = 0; if(unlikely(qt->request.rma)) { - if(query_dimension_add(qtl, qn, qc, qi, qt->request.rma, queryable_instance, &metrics_added)) + if(query_dimension_add(qtl, qn, qc, qi, qt->request.rma, queryable_instance, &metrics_added, priority++)) dimensions_added++; } else { RRDMETRIC *rm; dfe_start_read(ri->rrdmetrics, rm) { if(query_dimension_add(qtl, qn, qc, qi, (RRDMETRIC_ACQUIRED *) rm_dfe.item, - queryable_instance, &metrics_added)) + queryable_instance, &metrics_added, priority++)) dimensions_added++; } dfe_done(rm); @@ -957,13 +1018,7 @@ QUERY_TARGET *query_target_create(QUERY_TARGET_REQUEST *qtr) { if(!service_running(ABILITY_DATA_QUERIES)) return NULL; - QUERY_TARGET *qt = &thread_query_target; - - if(qt->used) - fatal("QUERY TARGET: this query target is already used (%zu queries made with this QUERY_TARGET so far).", qt->queries); - - qt->used = true; - qt->queries++; + QUERY_TARGET *qt = query_target_get(); if(!qtr->received_ut) qtr->received_ut = now_monotonic_usec(); @@ -985,7 +1040,11 @@ QUERY_TARGET *query_target_create(QUERY_TARGET_REQUEST *qtr) { query_target_generate_name(qt); qt->window.after = qt->request.after; qt->window.before = qt->request.before; + qt->window.options = qt->request.options; + if(query_target_has_percentage_of_instance(qt)) + qt->window.options &= ~RRDR_OPTION_PERCENTAGE; + rrdr_relative_window_to_absolute(&qt->window.after, &qt->window.before, &qt->window.now); // prepare our local variables - we need these across all these functions |