diff options
author | Timotej S <6674623+underhood@users.noreply.github.com> | 2020-11-06 10:16:20 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-11-06 10:16:20 +0100 |
commit | 144800420e305dfa352aab47ab324af19b320c83 (patch) | |
tree | 2d8c08a6ca4cdea0009b7da122bdacdb7165a6e2 /aclk | |
parent | 767afeeb393698f3a8c3d483f7bce32dd468a911 (diff) |
Adds metric showing how long Query spent in Queue (#10016)
* implements new ACLK metric
* cleans ACLK_stats code a bit
Diffstat (limited to 'aclk')
-rw-r--r-- | aclk/aclk_query.c | 23 | ||||
-rw-r--r-- | aclk/aclk_stats.c | 119 | ||||
-rw-r--r-- | aclk/aclk_stats.h | 43 | ||||
-rw-r--r-- | aclk/mqtt.c | 10 |
4 files changed, 111 insertions, 84 deletions
diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c index cd1cfd1e3e..5e4b879512 100644 --- a/aclk/aclk_query.c +++ b/aclk/aclk_query.c @@ -22,6 +22,7 @@ static netdata_mutex_t queue_mutex = NETDATA_MUTEX_INITIALIZER; struct aclk_query { usec_t created; + usec_t created_boot_time; time_t run_after; // Delay run until after this time ACLK_CMD cmd; // What command is this char *topic; // Topic to respond to @@ -233,6 +234,7 @@ int aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run new_query->data = data; new_query->next = NULL; new_query->created = now_realtime_usec(); + new_query->created_boot_time = now_boottime_usec(); new_query->run_after = run_after; debug(D_ACLK, "Added query (%s) (%s)", topic, query ? query : ""); @@ -319,22 +321,15 @@ static char *aclk_encode_response(char *src, size_t content_size, int keep_newli #pragma region ACLK_QUERY #endif -static usec_t aclk_web_api_request_v1(RRDHOST *host, struct web_client *w, char *url) +static usec_t aclk_web_api_request_v1(RRDHOST *host, struct web_client *w, char *url, usec_t q_created) { - usec_t t; + usec_t t = now_boottime_usec(); + aclk_metric_mat_update(&aclk_metrics_per_sample.cloud_q_recvd_to_processed, t - q_created); - t = now_monotonic_high_precision_usec(); w->response.code = web_client_api_request_v1(host, w, url); - t = now_monotonic_high_precision_usec() - t; + t = now_boottime_usec() - t; - if (aclk_stats_enabled) { - ACLK_STATS_LOCK; - aclk_metrics_per_sample.cloud_q_process_total += t; - aclk_metrics_per_sample.cloud_q_process_count++; - if (aclk_metrics_per_sample.cloud_q_process_max < t) - aclk_metrics_per_sample.cloud_q_process_max = t; - ACLK_STATS_UNLOCK; - } + aclk_metric_mat_update(&aclk_metrics_per_sample.cloud_q_db_query_time, t); return t; } @@ -361,7 +356,7 @@ static int aclk_execute_query(struct aclk_query *this_query) mysep = strrchr(this_query->query, '/'); // TODO: handle bad response perhaps in a different way. For now it does to the payload - aclk_web_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop"); + aclk_web_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop", this_query->created_boot_time); now_realtime_timeval(&w->tv_ready); w->response.data->date = w->tv_ready.tv_sec; web_client_build_http_header(w); // TODO: this function should offset from date, not tv_ready @@ -427,7 +422,7 @@ static int aclk_execute_query_v2(struct aclk_query *this_query) mysep = strrchr(this_query->query, '/'); // execute the query - t = aclk_web_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop"); + t = aclk_web_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop", this_query->created_boot_time); #ifdef NETDATA_WITH_ZLIB // check if gzip encoding can and should be used diff --git a/aclk/aclk_stats.c b/aclk/aclk_stats.c index 50378b134c..2a57cd6f0d 100644 --- a/aclk/aclk_stats.c +++ b/aclk/aclk_stats.c @@ -20,6 +20,51 @@ struct aclk_metrics aclk_metrics = { struct aclk_metrics_per_sample aclk_metrics_per_sample; +struct aclk_mat_metrics aclk_mat_metrics = { +#ifdef NETDATA_INTERNAL_CHECKS + .latency = { .name = "aclk_latency_mqtt", + .prio = 200002, + .st = NULL, + .rd_avg = NULL, + .rd_max = NULL, + .rd_total = NULL, + .unit = "ms", + .title = "ACLK Message Publish Latency" }, +#endif + + .cloud_q_db_query_time = { .name = "aclk_db_query_time", + .prio = 200006, + .st = NULL, + .rd_avg = NULL, + .rd_max = NULL, + .rd_total = NULL, + .unit = "us", + .title = "Time it took to process cloud requested DB queries" }, + + .cloud_q_recvd_to_processed = { .name = "aclk_cloud_q_recvd_to_processed", + .prio = 200007, + .st = NULL, + .rd_avg = NULL, + .rd_max = NULL, + .rd_total = NULL, + .unit = "us", + .title = "Time from receiving the Cloud Query until it was picked up " + "by query thread (just before passing to the database)." } +}; + +void aclk_metric_mat_update(struct aclk_metric_mat_data *metric, usec_t measurement) +{ + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + if (metric->max < measurement) + metric->max = measurement; + + metric->total += measurement; + metric->count++; + ACLK_STATS_UNLOCK; + } +} + static void aclk_stats_collect(struct aclk_metrics_per_sample *per_sample, struct aclk_metrics *permanent) { static RRDSET *st_aclkstats = NULL; @@ -61,33 +106,6 @@ static void aclk_stats_query_queue(struct aclk_metrics_per_sample *per_sample) rrdset_done(st_query_thread); } -#ifdef NETDATA_INTERNAL_CHECKS -static void aclk_stats_latency(struct aclk_metrics_per_sample *per_sample) -{ - static RRDSET *st = NULL; - static RRDDIM *rd_avg = NULL; - static RRDDIM *rd_max = NULL; - - if (unlikely(!st)) { - st = rrdset_create_localhost( - "netdata", "aclk_latency_mqtt", NULL, "aclk", NULL, "ACLK Message Publish Latency", "ms", - "netdata", "stats", 200002, localhost->rrd_update_every, RRDSET_TYPE_LINE); - - rd_avg = rrddim_add(st, "avg", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - rd_max = rrddim_add(st, "max", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); - } else - rrdset_next(st); - if(per_sample->latency_count) - rrddim_set_by_pointer(st, rd_avg, roundf((float)per_sample->latency_total / per_sample->latency_count)); - else - rrddim_set_by_pointer(st, rd_avg, 0); - - rrddim_set_by_pointer(st, rd_max, per_sample->latency_max); - - rrdset_done(st); -} -#endif - static void aclk_stats_write_q(struct aclk_metrics_per_sample *per_sample) { static RRDSET *st = NULL; @@ -181,32 +199,27 @@ static void aclk_stats_query_threads(uint32_t *queries_per_thread) rrdset_done(st); } -static void aclk_stats_query_time(struct aclk_metrics_per_sample *per_sample) +static void aclk_stats_mat_metric_process(struct aclk_metric_mat *metric, struct aclk_metric_mat_data *data) { - static RRDSET *st = NULL; - static RRDDIM *rd_rq_avg = NULL; - static RRDDIM *rd_rq_max = NULL; - static RRDDIM *rd_rq_total = NULL; - - if (unlikely(!st)) { - st = rrdset_create_localhost( - "netdata", "aclk_query_time", NULL, "aclk", NULL, "Time it took to process cloud requested DB queries", "us", - "netdata", "stats", 200006, localhost->rrd_update_every, RRDSET_TYPE_LINE); - - rd_rq_avg = rrddim_add(st, "avg", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); - rd_rq_max = rrddim_add(st, "max", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); - rd_rq_total = rrddim_add(st, "total", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + if(unlikely(!metric->st)) { + metric->st = rrdset_create_localhost( + "netdata", metric->name, NULL, "aclk", NULL, metric->title, metric->unit, "netdata", "stats", metric->prio, + localhost->rrd_update_every, RRDSET_TYPE_LINE); + + metric->rd_avg = rrddim_add(metric->st, "avg", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + metric->rd_max = rrddim_add(metric->st, "max", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + metric->rd_total = rrddim_add(metric->st, "total", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); } else - rrdset_next(st); + rrdset_next(metric->st); - if(per_sample->cloud_q_process_count) - rrddim_set_by_pointer(st, rd_rq_avg, roundf((float)per_sample->cloud_q_process_total / per_sample->cloud_q_process_count)); + if(data->count) + rrddim_set_by_pointer(metric->st, metric->rd_avg, roundf((float)data->total / data->count)); else - rrddim_set_by_pointer(st, rd_rq_avg, 0); - rrddim_set_by_pointer(st, rd_rq_max, per_sample->cloud_q_process_max); - rrddim_set_by_pointer(st, rd_rq_total, per_sample->cloud_q_process_total); + rrddim_set_by_pointer(metric->st, metric->rd_avg, 0); + rrddim_set_by_pointer(metric->st, metric->rd_max, data->max); + rrddim_set_by_pointer(metric->st, metric->rd_total, data->total); - rrdset_done(st); + rrdset_done(metric->st); } void aclk_stats_thread_cleanup() @@ -255,16 +268,18 @@ void *aclk_stats_main_thread(void *ptr) aclk_stats_collect(&per_sample, &permanent); aclk_stats_query_queue(&per_sample); -#ifdef NETDATA_INTERNAL_CHECKS - aclk_stats_latency(&per_sample); -#endif + aclk_stats_write_q(&per_sample); aclk_stats_read_q(&per_sample); aclk_stats_cloud_req(&per_sample); aclk_stats_query_threads(aclk_queries_per_thread_sample); - aclk_stats_query_time(&per_sample); +#ifdef NETDATA_INTERNAL_CHECKS + aclk_stats_mat_metric_process(&aclk_mat_metrics.latency, &per_sample.latency); +#endif + aclk_stats_mat_metric_process(&aclk_mat_metrics.cloud_q_db_query_time, &per_sample.cloud_q_db_query_time); + aclk_stats_mat_metric_process(&aclk_mat_metrics.cloud_q_recvd_to_processed, &per_sample.cloud_q_recvd_to_processed); } return 0; diff --git a/aclk/aclk_stats.h b/aclk/aclk_stats.h index 7ef3e337c6..49ea473485 100644 --- a/aclk/aclk_stats.h +++ b/aclk/aclk_stats.h @@ -26,6 +26,35 @@ struct aclk_metrics { volatile uint8_t online; }; +//mat = max average total +struct aclk_metric_mat_data { + volatile uint32_t total; + volatile uint32_t count; + volatile uint32_t max; +}; + +//mat = max average total +struct aclk_metric_mat { + char *name; + char *title; + RRDSET *st; + RRDDIM *rd_avg; + RRDDIM *rd_max; + RRDDIM *rd_total; + long prio; + char *unit; +}; + +extern struct aclk_mat_metrics { +#ifdef NETDATA_INTERNAL_CHECKS + struct aclk_metric_mat latency; +#endif + struct aclk_metric_mat cloud_q_db_query_time; + struct aclk_metric_mat cloud_q_recvd_to_processed; +} aclk_mat_metrics; + +void aclk_metric_mat_update(struct aclk_metric_mat_data *metric, usec_t measurement); + // reset to 0 on every sample extern struct aclk_metrics_per_sample { /* in the unlikely event of ACLK disconnecting @@ -37,12 +66,6 @@ extern struct aclk_metrics_per_sample { volatile uint32_t queries_queued; volatile uint32_t queries_dispatched; -#ifdef NETDATA_INTERNAL_CHECKS - volatile uint32_t latency_max; - volatile uint32_t latency_total; - volatile uint32_t latency_count; -#endif - volatile uint32_t write_q_added; volatile uint32_t write_q_consumed; @@ -52,9 +75,11 @@ extern struct aclk_metrics_per_sample { volatile uint32_t cloud_req_recvd; volatile uint32_t cloud_req_err; - volatile uint32_t cloud_q_process_total; - volatile uint32_t cloud_q_process_count; - volatile uint32_t cloud_q_process_max; +#ifdef NETDATA_INTERNAL_CHECKS + struct aclk_metric_mat_data latency; +#endif + struct aclk_metric_mat_data cloud_q_db_query_time; + struct aclk_metric_mat_data cloud_q_recvd_to_processed; } aclk_metrics_per_sample; extern uint32_t *aclk_queries_per_thread; diff --git a/aclk/mqtt.c b/aclk/mqtt.c index b52d681711..58534d5d1b 100644 --- a/aclk/mqtt.c +++ b/aclk/mqtt.c @@ -44,15 +44,7 @@ void publish_callback(struct mosquitto *mosq, void *obj, int rc) info("Publish_callback: mid=%d latency=%" PRId64 "ms", rc, diff); - if (aclk_stats_enabled) { - ACLK_STATS_LOCK; - if (aclk_metrics_per_sample.latency_max < diff) - aclk_metrics_per_sample.latency_max = diff; - - aclk_metrics_per_sample.latency_total += diff; - aclk_metrics_per_sample.latency_count++; - ACLK_STATS_UNLOCK; - } + aclk_metric_mat_update(&aclk_metrics_per_sample.latency, diff); #endif return; } |