summaryrefslogtreecommitdiffstats
path: root/aclk
diff options
context:
space:
mode:
authorTimotej S <6674623+underhood@users.noreply.github.com>2020-11-06 10:16:20 +0100
committerGitHub <noreply@github.com>2020-11-06 10:16:20 +0100
commit144800420e305dfa352aab47ab324af19b320c83 (patch)
tree2d8c08a6ca4cdea0009b7da122bdacdb7165a6e2 /aclk
parent767afeeb393698f3a8c3d483f7bce32dd468a911 (diff)
Adds metric showing how long Query spent in Queue (#10016)
* implements new ACLK metric * cleans ACLK_stats code a bit
Diffstat (limited to 'aclk')
-rw-r--r--aclk/aclk_query.c23
-rw-r--r--aclk/aclk_stats.c119
-rw-r--r--aclk/aclk_stats.h43
-rw-r--r--aclk/mqtt.c10
4 files changed, 111 insertions, 84 deletions
diff --git a/aclk/aclk_query.c b/aclk/aclk_query.c
index cd1cfd1e3e..5e4b879512 100644
--- a/aclk/aclk_query.c
+++ b/aclk/aclk_query.c
@@ -22,6 +22,7 @@ static netdata_mutex_t queue_mutex = NETDATA_MUTEX_INITIALIZER;
struct aclk_query {
usec_t created;
+ usec_t created_boot_time;
time_t run_after; // Delay run until after this time
ACLK_CMD cmd; // What command is this
char *topic; // Topic to respond to
@@ -233,6 +234,7 @@ int aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run
new_query->data = data;
new_query->next = NULL;
new_query->created = now_realtime_usec();
+ new_query->created_boot_time = now_boottime_usec();
new_query->run_after = run_after;
debug(D_ACLK, "Added query (%s) (%s)", topic, query ? query : "");
@@ -319,22 +321,15 @@ static char *aclk_encode_response(char *src, size_t content_size, int keep_newli
#pragma region ACLK_QUERY
#endif
-static usec_t aclk_web_api_request_v1(RRDHOST *host, struct web_client *w, char *url)
+static usec_t aclk_web_api_request_v1(RRDHOST *host, struct web_client *w, char *url, usec_t q_created)
{
- usec_t t;
+ usec_t t = now_boottime_usec();
+ aclk_metric_mat_update(&aclk_metrics_per_sample.cloud_q_recvd_to_processed, t - q_created);
- t = now_monotonic_high_precision_usec();
w->response.code = web_client_api_request_v1(host, w, url);
- t = now_monotonic_high_precision_usec() - t;
+ t = now_boottime_usec() - t;
- if (aclk_stats_enabled) {
- ACLK_STATS_LOCK;
- aclk_metrics_per_sample.cloud_q_process_total += t;
- aclk_metrics_per_sample.cloud_q_process_count++;
- if (aclk_metrics_per_sample.cloud_q_process_max < t)
- aclk_metrics_per_sample.cloud_q_process_max = t;
- ACLK_STATS_UNLOCK;
- }
+ aclk_metric_mat_update(&aclk_metrics_per_sample.cloud_q_db_query_time, t);
return t;
}
@@ -361,7 +356,7 @@ static int aclk_execute_query(struct aclk_query *this_query)
mysep = strrchr(this_query->query, '/');
// TODO: handle bad response perhaps in a different way. For now it does to the payload
- aclk_web_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop");
+ aclk_web_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop", this_query->created_boot_time);
now_realtime_timeval(&w->tv_ready);
w->response.data->date = w->tv_ready.tv_sec;
web_client_build_http_header(w); // TODO: this function should offset from date, not tv_ready
@@ -427,7 +422,7 @@ static int aclk_execute_query_v2(struct aclk_query *this_query)
mysep = strrchr(this_query->query, '/');
// execute the query
- t = aclk_web_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop");
+ t = aclk_web_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop", this_query->created_boot_time);
#ifdef NETDATA_WITH_ZLIB
// check if gzip encoding can and should be used
diff --git a/aclk/aclk_stats.c b/aclk/aclk_stats.c
index 50378b134c..2a57cd6f0d 100644
--- a/aclk/aclk_stats.c
+++ b/aclk/aclk_stats.c
@@ -20,6 +20,51 @@ struct aclk_metrics aclk_metrics = {
struct aclk_metrics_per_sample aclk_metrics_per_sample;
+struct aclk_mat_metrics aclk_mat_metrics = {
+#ifdef NETDATA_INTERNAL_CHECKS
+ .latency = { .name = "aclk_latency_mqtt",
+ .prio = 200002,
+ .st = NULL,
+ .rd_avg = NULL,
+ .rd_max = NULL,
+ .rd_total = NULL,
+ .unit = "ms",
+ .title = "ACLK Message Publish Latency" },
+#endif
+
+ .cloud_q_db_query_time = { .name = "aclk_db_query_time",
+ .prio = 200006,
+ .st = NULL,
+ .rd_avg = NULL,
+ .rd_max = NULL,
+ .rd_total = NULL,
+ .unit = "us",
+ .title = "Time it took to process cloud requested DB queries" },
+
+ .cloud_q_recvd_to_processed = { .name = "aclk_cloud_q_recvd_to_processed",
+ .prio = 200007,
+ .st = NULL,
+ .rd_avg = NULL,
+ .rd_max = NULL,
+ .rd_total = NULL,
+ .unit = "us",
+ .title = "Time from receiving the Cloud Query until it was picked up "
+ "by query thread (just before passing to the database)." }
+};
+
+void aclk_metric_mat_update(struct aclk_metric_mat_data *metric, usec_t measurement)
+{
+ if (aclk_stats_enabled) {
+ ACLK_STATS_LOCK;
+ if (metric->max < measurement)
+ metric->max = measurement;
+
+ metric->total += measurement;
+ metric->count++;
+ ACLK_STATS_UNLOCK;
+ }
+}
+
static void aclk_stats_collect(struct aclk_metrics_per_sample *per_sample, struct aclk_metrics *permanent)
{
static RRDSET *st_aclkstats = NULL;
@@ -61,33 +106,6 @@ static void aclk_stats_query_queue(struct aclk_metrics_per_sample *per_sample)
rrdset_done(st_query_thread);
}
-#ifdef NETDATA_INTERNAL_CHECKS
-static void aclk_stats_latency(struct aclk_metrics_per_sample *per_sample)
-{
- static RRDSET *st = NULL;
- static RRDDIM *rd_avg = NULL;
- static RRDDIM *rd_max = NULL;
-
- if (unlikely(!st)) {
- st = rrdset_create_localhost(
- "netdata", "aclk_latency_mqtt", NULL, "aclk", NULL, "ACLK Message Publish Latency", "ms",
- "netdata", "stats", 200002, localhost->rrd_update_every, RRDSET_TYPE_LINE);
-
- rd_avg = rrddim_add(st, "avg", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
- rd_max = rrddim_add(st, "max", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
- } else
- rrdset_next(st);
- if(per_sample->latency_count)
- rrddim_set_by_pointer(st, rd_avg, roundf((float)per_sample->latency_total / per_sample->latency_count));
- else
- rrddim_set_by_pointer(st, rd_avg, 0);
-
- rrddim_set_by_pointer(st, rd_max, per_sample->latency_max);
-
- rrdset_done(st);
-}
-#endif
-
static void aclk_stats_write_q(struct aclk_metrics_per_sample *per_sample)
{
static RRDSET *st = NULL;
@@ -181,32 +199,27 @@ static void aclk_stats_query_threads(uint32_t *queries_per_thread)
rrdset_done(st);
}
-static void aclk_stats_query_time(struct aclk_metrics_per_sample *per_sample)
+static void aclk_stats_mat_metric_process(struct aclk_metric_mat *metric, struct aclk_metric_mat_data *data)
{
- static RRDSET *st = NULL;
- static RRDDIM *rd_rq_avg = NULL;
- static RRDDIM *rd_rq_max = NULL;
- static RRDDIM *rd_rq_total = NULL;
-
- if (unlikely(!st)) {
- st = rrdset_create_localhost(
- "netdata", "aclk_query_time", NULL, "aclk", NULL, "Time it took to process cloud requested DB queries", "us",
- "netdata", "stats", 200006, localhost->rrd_update_every, RRDSET_TYPE_LINE);
-
- rd_rq_avg = rrddim_add(st, "avg", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
- rd_rq_max = rrddim_add(st, "max", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
- rd_rq_total = rrddim_add(st, "total", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ if(unlikely(!metric->st)) {
+ metric->st = rrdset_create_localhost(
+ "netdata", metric->name, NULL, "aclk", NULL, metric->title, metric->unit, "netdata", "stats", metric->prio,
+ localhost->rrd_update_every, RRDSET_TYPE_LINE);
+
+ metric->rd_avg = rrddim_add(metric->st, "avg", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ metric->rd_max = rrddim_add(metric->st, "max", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
+ metric->rd_total = rrddim_add(metric->st, "total", NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
} else
- rrdset_next(st);
+ rrdset_next(metric->st);
- if(per_sample->cloud_q_process_count)
- rrddim_set_by_pointer(st, rd_rq_avg, roundf((float)per_sample->cloud_q_process_total / per_sample->cloud_q_process_count));
+ if(data->count)
+ rrddim_set_by_pointer(metric->st, metric->rd_avg, roundf((float)data->total / data->count));
else
- rrddim_set_by_pointer(st, rd_rq_avg, 0);
- rrddim_set_by_pointer(st, rd_rq_max, per_sample->cloud_q_process_max);
- rrddim_set_by_pointer(st, rd_rq_total, per_sample->cloud_q_process_total);
+ rrddim_set_by_pointer(metric->st, metric->rd_avg, 0);
+ rrddim_set_by_pointer(metric->st, metric->rd_max, data->max);
+ rrddim_set_by_pointer(metric->st, metric->rd_total, data->total);
- rrdset_done(st);
+ rrdset_done(metric->st);
}
void aclk_stats_thread_cleanup()
@@ -255,16 +268,18 @@ void *aclk_stats_main_thread(void *ptr)
aclk_stats_collect(&per_sample, &permanent);
aclk_stats_query_queue(&per_sample);
-#ifdef NETDATA_INTERNAL_CHECKS
- aclk_stats_latency(&per_sample);
-#endif
+
aclk_stats_write_q(&per_sample);
aclk_stats_read_q(&per_sample);
aclk_stats_cloud_req(&per_sample);
aclk_stats_query_threads(aclk_queries_per_thread_sample);
- aclk_stats_query_time(&per_sample);
+#ifdef NETDATA_INTERNAL_CHECKS
+ aclk_stats_mat_metric_process(&aclk_mat_metrics.latency, &per_sample.latency);
+#endif
+ aclk_stats_mat_metric_process(&aclk_mat_metrics.cloud_q_db_query_time, &per_sample.cloud_q_db_query_time);
+ aclk_stats_mat_metric_process(&aclk_mat_metrics.cloud_q_recvd_to_processed, &per_sample.cloud_q_recvd_to_processed);
}
return 0;
diff --git a/aclk/aclk_stats.h b/aclk/aclk_stats.h
index 7ef3e337c6..49ea473485 100644
--- a/aclk/aclk_stats.h
+++ b/aclk/aclk_stats.h
@@ -26,6 +26,35 @@ struct aclk_metrics {
volatile uint8_t online;
};
+//mat = max average total
+struct aclk_metric_mat_data {
+ volatile uint32_t total;
+ volatile uint32_t count;
+ volatile uint32_t max;
+};
+
+//mat = max average total
+struct aclk_metric_mat {
+ char *name;
+ char *title;
+ RRDSET *st;
+ RRDDIM *rd_avg;
+ RRDDIM *rd_max;
+ RRDDIM *rd_total;
+ long prio;
+ char *unit;
+};
+
+extern struct aclk_mat_metrics {
+#ifdef NETDATA_INTERNAL_CHECKS
+ struct aclk_metric_mat latency;
+#endif
+ struct aclk_metric_mat cloud_q_db_query_time;
+ struct aclk_metric_mat cloud_q_recvd_to_processed;
+} aclk_mat_metrics;
+
+void aclk_metric_mat_update(struct aclk_metric_mat_data *metric, usec_t measurement);
+
// reset to 0 on every sample
extern struct aclk_metrics_per_sample {
/* in the unlikely event of ACLK disconnecting
@@ -37,12 +66,6 @@ extern struct aclk_metrics_per_sample {
volatile uint32_t queries_queued;
volatile uint32_t queries_dispatched;
-#ifdef NETDATA_INTERNAL_CHECKS
- volatile uint32_t latency_max;
- volatile uint32_t latency_total;
- volatile uint32_t latency_count;
-#endif
-
volatile uint32_t write_q_added;
volatile uint32_t write_q_consumed;
@@ -52,9 +75,11 @@ extern struct aclk_metrics_per_sample {
volatile uint32_t cloud_req_recvd;
volatile uint32_t cloud_req_err;
- volatile uint32_t cloud_q_process_total;
- volatile uint32_t cloud_q_process_count;
- volatile uint32_t cloud_q_process_max;
+#ifdef NETDATA_INTERNAL_CHECKS
+ struct aclk_metric_mat_data latency;
+#endif
+ struct aclk_metric_mat_data cloud_q_db_query_time;
+ struct aclk_metric_mat_data cloud_q_recvd_to_processed;
} aclk_metrics_per_sample;
extern uint32_t *aclk_queries_per_thread;
diff --git a/aclk/mqtt.c b/aclk/mqtt.c
index b52d681711..58534d5d1b 100644
--- a/aclk/mqtt.c
+++ b/aclk/mqtt.c
@@ -44,15 +44,7 @@ void publish_callback(struct mosquitto *mosq, void *obj, int rc)
info("Publish_callback: mid=%d latency=%" PRId64 "ms", rc, diff);
- if (aclk_stats_enabled) {
- ACLK_STATS_LOCK;
- if (aclk_metrics_per_sample.latency_max < diff)
- aclk_metrics_per_sample.latency_max = diff;
-
- aclk_metrics_per_sample.latency_total += diff;
- aclk_metrics_per_sample.latency_count++;
- ACLK_STATS_UNLOCK;
- }
+ aclk_metric_mat_update(&aclk_metrics_per_sample.latency, diff);
#endif
return;
}