diff options
Diffstat (limited to 'web/api/queries/query.c')
-rw-r--r-- | web/api/queries/query.c | 87 |
1 files changed, 62 insertions, 25 deletions
diff --git a/web/api/queries/query.c b/web/api/queries/query.c index 3f89ce22f2..e09d0e22c9 100644 --- a/web/api/queries/query.c +++ b/web/api/queries/query.c @@ -679,9 +679,6 @@ RRDR_GROUP_BY group_by_parse(char *s) { group_by |= RRDR_GROUP_BY_LABEL; } - if(group_by == RRDR_GROUP_BY_NONE) - group_by = RRDR_GROUP_BY_DIMENSION; - return group_by; } @@ -971,7 +968,8 @@ typedef struct query_point { time_t end_time; time_t start_time; NETDATA_DOUBLE value; - NETDATA_DOUBLE anomaly; + size_t anomaly_outlier_points; + size_t anomaly_all_points; SN_FLAGS flags; #ifdef NETDATA_INTERNAL_CHECKS size_t id; @@ -982,7 +980,8 @@ QUERY_POINT QUERY_POINT_EMPTY = { .end_time = 0, .start_time = 0, .value = NAN, - .anomaly = 0, + .anomaly_outlier_points = 0, + .anomaly_all_points = 0, .flags = SN_FLAG_NONE, #ifdef NETDATA_INTERNAL_CHECKS .id = 0, @@ -1022,7 +1021,8 @@ typedef struct query_engine_ops { NETDATA_DOUBLE (*grouping_flush)(struct rrdresult *r, RRDR_VALUE_FLAGS *rrdr_value_options_ptr); size_t group_points_non_zero; size_t group_points_added; - NETDATA_DOUBLE group_anomaly_rate; + size_t group_anomaly_outlier_points; + size_t group_anomaly_all_points; RRDR_VALUE_FLAGS group_value_flags; // statistics @@ -1087,6 +1087,8 @@ static void query_planer_initialize_plans(QUERY_ENGINE_OPS *ops) { qm->plan.array[p].expanded_after = after; qm->plan.array[p].expanded_before = before; + ops->r->internal.qt->db.tiers[tier].queries++; + struct query_metric_tier *tier_ptr = &qm->tiers[tier]; tier_ptr->eng->api.query_ops.init( tier_ptr->db_metric_handle, @@ -1350,7 +1352,8 @@ static bool query_plan(QUERY_ENGINE_OPS *ops, time_t after_wanted, time_t before } \ \ (ops)->group_points_added++; \ - (ops)->group_anomaly_rate += (point).anomaly; \ + (ops)->group_anomaly_outlier_points = (point).anomaly_outlier_points; \ + (ops)->group_anomaly_all_points = (point).anomaly_all_points; \ } while(0) static QUERY_ENGINE_OPS *rrd2rrdr_query_prep(RRDR *r, size_t dim_id_in_rrdr) { @@ -1484,7 +1487,8 @@ static void rrd2rrdr_query_execute(RRDR *r, size_t dim_id_in_rrdr, QUERY_ENGINE_ new_point.start_time = sp.start_time_s; new_point.end_time = sp.end_time_s; - new_point.anomaly = sp.count ? (NETDATA_DOUBLE)sp.anomaly_count * 100.0 / (NETDATA_DOUBLE)sp.count : 0.0; + new_point.anomaly_outlier_points = sp.anomaly_count; + new_point.anomaly_all_points = sp.count; query_point_set_id(new_point, ops->db_total_points_read); // if(debug_this) @@ -1495,7 +1499,7 @@ static void rrd2rrdr_query_execute(RRDR *r, size_t dim_id_in_rrdr, QUERY_ENGINE_ if(likely(!storage_point_is_unset(sp) && !storage_point_is_gap(sp))) { if(unlikely(use_anomaly_bit_as_value)) - new_point.value = new_point.anomaly; + new_point.value = new_point.anomaly_all_points ? (NETDATA_DOUBLE)new_point.anomaly_outlier_points * 100.0 / (NETDATA_DOUBLE)new_point.anomaly_all_points : 0.0; else { switch (ops->tier_query_fetch) { @@ -1688,10 +1692,9 @@ static void rrd2rrdr_query_execute(RRDR *r, size_t dim_id_in_rrdr, QUERY_ENGINE_ NETDATA_DOUBLE group_value = ops->grouping_flush(r, rrdr_value_options_ptr); r->v[rrdr_o_v_index] = group_value; - // we only store uint8_t anomaly rates, - // so let's get double precision by storing - // anomaly rates in the range 0 - 200 - r->ar[rrdr_o_v_index] = ops->group_anomaly_rate / (NETDATA_DOUBLE)ops->group_points_added; + NETDATA_DOUBLE group_ar = r->ar[rrdr_o_v_index] = ops->group_anomaly_all_points ? + (NETDATA_DOUBLE)ops->group_anomaly_outlier_points * 100.0 / (NETDATA_DOUBLE)ops->group_anomaly_all_points + : 0.0; if(likely(points_added || dim_id_in_rrdr)) { // find the min/max across all dimensions @@ -1720,15 +1723,17 @@ static void rrd2rrdr_query_execute(RRDR *r, size_t dim_id_in_rrdr, QUERY_ENGINE_ qm->query_stats.max = stats_value; } + qm->query_stats.anomaly_sum += group_ar; qm->query_stats.sum += stats_value; qm->query_stats.volume += stats_value * (NETDATA_DOUBLE)ops->view_update_every; - qm->query_stats.count++; + qm->query_stats.group_points++; points_added++; ops->group_points_added = 0; ops->group_value_flags = RRDR_VALUE_NOTHING; ops->group_points_non_zero = 0; - ops->group_anomaly_rate = 0; + ops->group_anomaly_outlier_points = 0; + ops->group_anomaly_all_points = 0; } // the loop above increased "now" by query_granularity, // but the main loop will increase it too, @@ -1741,7 +1746,7 @@ static void rrd2rrdr_query_execute(RRDR *r, size_t dim_id_in_rrdr, QUERY_ENGINE_ r->stats.result_points_generated += points_added; r->stats.db_points_read += ops->db_total_points_read; for(size_t tr = 0; tr < storage_tiers ; tr++) - r->stats.tier_points_read[tr] += ops->db_points_read_per_tier[tr]; + qt->db.tiers[tr].points += ops->db_points_read_per_tier[tr]; r->view.min = min; r->view.max = max; @@ -2283,11 +2288,12 @@ RRDR *rrd2rrdr_legacy( } void query_target_merge_data_statistics(struct query_data_statistics *d, struct query_data_statistics *s) { - if(!d->count) + if(!d->group_points) *d = *s; else { - d->count += s->count; + d->group_points += s->group_points; d->sum += s->sum; + d->anomaly_sum += s->anomaly_sum; d->volume += s->volume; if(s->min < d->min) @@ -2361,7 +2367,7 @@ RRDR *rrd2rrdr(ONEWAYALLOC *owa, QUERY_TARGET *qt) { if(qt->query.used) ops = onewayalloc_callocz(r->internal.owa, qt->query.used, sizeof(QUERY_ENGINE_OPS *)); - size_t capacity = libuv_worker_threads * 2; + size_t capacity = libuv_worker_threads * 10; size_t max_queries_to_prepare = (qt->query.used > (capacity - 1)) ? (capacity - 1) : qt->query.used; size_t queries_prepared = 0; while(queries_prepared < max_queries_to_prepare) { @@ -2375,7 +2381,7 @@ RRDR *rrd2rrdr(ONEWAYALLOC *owa, QUERY_TARGET *qt) { QUERY_DIMENSION *qd = query_dimension(qt, qm->link.query_dimension_id); QUERY_INSTANCE *qi = query_instance(qt, qm->link.query_instance_id); QUERY_CONTEXT *qc = query_context(qt, qm->link.query_context_id); - QUERY_HOST *qh = query_host(qt, qm->link.query_host_id); + QUERY_NODE *qn = query_node(qt, qm->link.query_host_id); if(queries_prepared < max) { // preload another query @@ -2390,30 +2396,30 @@ RRDR *rrd2rrdr(ONEWAYALLOC *owa, QUERY_TARGET *qt) { r->grouping.reset(r); if(ops[c]) { + rrd2rrdr_query_execute(r, c, ops[c]); + r->od[c] |= RRDR_DIMENSION_QUERIED; r->di[c] = rrdmetric_acquired_id_dup(qd->rma); r->dn[c] = rrdmetric_acquired_name_dup(qd->rma); qi->metrics.queried++; qc->metrics.queried++; - qh->metrics.queried++; + qn->metrics.queried++; qd->status |= QUERY_STATUS_QUERIED; qm->status |= RRDR_DIMENSION_QUERIED; - rrd2rrdr_query_execute(r, c, ops[c]); - if(qt->request.version >= 2) { query_target_merge_data_statistics(&qi->query_stats, &qm->query_stats); query_target_merge_data_statistics(&qc->query_stats, &qm->query_stats); - query_target_merge_data_statistics(&qh->query_stats, &qm->query_stats); + query_target_merge_data_statistics(&qn->query_stats, &qm->query_stats); query_target_merge_data_statistics(&qt->query_stats, &qm->query_stats); } } else { qi->metrics.failed++; qc->metrics.failed++; - qh->metrics.failed++; + qn->metrics.failed++; qd->status |= QUERY_STATUS_FAILED; qm->status |= RRDR_DIMENSION_FAILED; @@ -2480,6 +2486,37 @@ RRDR *rrd2rrdr(ONEWAYALLOC *owa, QUERY_TARGET *qt) { } } + // update query instance counts in query host and query context + if(qt->request.version >= 2) { + size_t h = 0, c = 0, i = 0; + for(; h < qt->nodes.used ; h++) { + QUERY_NODE *qn = &qt->nodes.array[h]; + + for(; c < qt->contexts.used ;c++) { + QUERY_CONTEXT *qc = &qt->contexts.array[c]; + + if(!rrdcontext_acquired_belongs_to_host(qc->rca, qn->rrdhost)) + break; + + for(; i < qt->instances.used ;i++) { + QUERY_INSTANCE *qi = &qt->instances.array[i]; + + if(!rrdinstance_acquired_belongs_to_context(qi->ria, qc->rca)) + break; + + if(qi->metrics.queried) { + qc->instances.queried++; + qn->instances.queried++; + } + else if(qi->metrics.failed) { + qc->instances.failed++; + qn->instances.failed++; + } + } + } + } + } + #ifdef NETDATA_INTERNAL_CHECKS if (dimensions_used) { if(r->internal.log) |