diff options
-rw-r--r-- | aclk/aclk.c | 4 | ||||
-rw-r--r-- | aclk/aclk_rx_msgs.c | 16 | ||||
-rw-r--r-- | aclk/aclk_rx_msgs.h | 3 | ||||
-rw-r--r-- | aclk/aclk_stats.c | 82 | ||||
-rw-r--r-- | aclk/aclk_stats.h | 5 |
5 files changed, 95 insertions, 15 deletions
diff --git a/aclk/aclk.c b/aclk/aclk.c index 43f7559748..a2159d9179 100644 --- a/aclk/aclk.c +++ b/aclk/aclk.c @@ -761,8 +761,9 @@ void *aclk_main(void *ptr) return NULL; } + unsigned int proto_hdl_cnt; #ifdef ENABLE_NEW_CLOUD_PROTOCOL - aclk_init_rx_msg_handlers(); + proto_hdl_cnt = aclk_init_rx_msg_handlers(); #endif // This thread is unusual in that it cannot be cancelled by cancel_main_threads() @@ -802,6 +803,7 @@ void *aclk_main(void *ptr) stats_thread = callocz(1, sizeof(struct aclk_stats_thread)); stats_thread->thread = mallocz(sizeof(netdata_thread_t)); stats_thread->query_thread_count = query_threads.count; + aclk_stats_thread_prepare(query_threads.count, proto_hdl_cnt); netdata_thread_create( stats_thread->thread, ACLK_STATS_THREAD_NAME, NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread, stats_thread); diff --git a/aclk/aclk_rx_msgs.c b/aclk/aclk_rx_msgs.c index ecb2b4179d..1f2cb27ef3 100644 --- a/aclk/aclk_rx_msgs.c +++ b/aclk/aclk_rx_msgs.c @@ -457,9 +457,15 @@ new_cloud_rx_msg_t *find_rx_handler_by_hash(simple_hash_t hash) return NULL; } -void aclk_init_rx_msg_handlers(void) +const char *rx_handler_get_name(size_t i) { - for (int i = 0; rx_msgs[i].fnc; i++) { + return rx_msgs[i].name; +} + +unsigned int aclk_init_rx_msg_handlers(void) +{ + int i; + for (i = 0; rx_msgs[i].fnc; i++) { simple_hash_t hash = simple_hash(rx_msgs[i].name); new_cloud_rx_msg_t *hdl = find_rx_handler_by_hash(hash); if (unlikely(hdl)) { @@ -469,6 +475,7 @@ void aclk_init_rx_msg_handlers(void) } rx_msgs[i].name_hash = hash; } + return i; } void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len) @@ -489,6 +496,11 @@ void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t } return; } + if (aclk_stats_enabled) { + ACLK_STATS_LOCK; + aclk_proto_rx_msgs_sample[msg_descriptor-rx_msgs]++; + ACLK_STATS_UNLOCK; + } if (msg_descriptor->fnc(msg, msg_len)) { error("Error processing message of type '%s'", message_type); if (aclk_stats_enabled) { diff --git a/aclk/aclk_rx_msgs.h b/aclk/aclk_rx_msgs.h index 38243a4c93..00f88c6a8d 100644 --- a/aclk/aclk_rx_msgs.h +++ b/aclk/aclk_rx_msgs.h @@ -11,7 +11,8 @@ int aclk_handle_cloud_cmd_message(char *payload); #ifdef ENABLE_NEW_CLOUD_PROTOCOL -void aclk_init_rx_msg_handlers(void); +const char *rx_handler_get_name(size_t i); +unsigned int aclk_init_rx_msg_handlers(void); void aclk_handle_new_cloud_msg(const char *message_type, const char *msg, size_t msg_len); #endif diff --git a/aclk/aclk_stats.c b/aclk/aclk_stats.c index a75f3b2cbe..a9f0a923c1 100644 --- a/aclk/aclk_stats.c +++ b/aclk/aclk_stats.c @@ -6,7 +6,14 @@ netdata_mutex_t aclk_stats_mutex = NETDATA_MUTEX_INITIALIZER; -int query_thread_count; +struct { + int query_thread_count; +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + unsigned int proto_hdl_cnt; + uint32_t *aclk_proto_rx_msgs_sample; + RRDDIM **rx_msg_dims; +#endif +} aclk_stats_cfg; // there is only 1 stats thread at a time // data ACLK stats need per query thread struct aclk_qt_data { @@ -15,6 +22,7 @@ struct aclk_qt_data { uint32_t *aclk_queries_per_thread = NULL; uint32_t *aclk_queries_per_thread_sample = NULL; +uint32_t *aclk_proto_rx_msgs_sample = NULL; struct aclk_metrics aclk_metrics = { .online = 0, @@ -186,7 +194,7 @@ static void aclk_stats_query_threads(uint32_t *queries_per_thread) "netdata", "aclk_query_threads", NULL, "aclk", NULL, "Queries Processed Per Thread", "req/s", "netdata", "stats", 200009, localhost->rrd_update_every, RRDSET_TYPE_STACKED); - for (int i = 0; i < query_thread_count; i++) { + for (int i = 0; i < aclk_stats_cfg.query_thread_count; i++) { if (snprintfz(dim_name, MAX_DIM_NAME, "Query %d", i) < 0) error("snprintf encoding error"); aclk_qt_data[i].dim = rrddim_add(st, dim_name, NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); @@ -194,7 +202,7 @@ static void aclk_stats_query_threads(uint32_t *queries_per_thread) } else rrdset_next(st); - for (int i = 0; i < query_thread_count; i++) { + for (int i = 0; i < aclk_stats_cfg.query_thread_count; i++) { rrddim_set_by_pointer(st, aclk_qt_data[i].dim, queries_per_thread[i]); } @@ -229,8 +237,57 @@ static void aclk_stats_query_time(struct aclk_metrics_per_sample *per_sample) rrdset_done(st); } +#ifdef ENABLE_NEW_CLOUD_PROTOCOL +const char *rx_handler_get_name(size_t i); +static void aclk_stats_newproto_rx(uint32_t *rx_msgs_sample) +{ + static RRDSET *st = NULL; + + if (unlikely(!st)) { + st = rrdset_create_localhost( + "netdata", "aclk_protobuf_rx_types", NULL, "aclk", NULL, "Received new cloud architecture messages by their type.", "msg/s", + "netdata", "stats", 200010, localhost->rrd_update_every, RRDSET_TYPE_STACKED); + + for (unsigned int i = 0; i < aclk_stats_cfg.proto_hdl_cnt; i++) { + aclk_stats_cfg.rx_msg_dims[i] = rrddim_add(st, rx_handler_get_name(i), NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE); + } + } else + rrdset_next(st); + + for (unsigned int i = 0; i < aclk_stats_cfg.proto_hdl_cnt; i++) + rrddim_set_by_pointer(st, aclk_stats_cfg.rx_msg_dims[i], rx_msgs_sample[i]); + + rrdset_done(st); +} +#endif + +void aclk_stats_thread_prepare(int query_thread_count, unsigned int proto_hdl_cnt) +{ +#ifndef ENABLE_NEW_CLOUD_PROTOCOL + UNUSED(proto_hdl_cnt); +#endif + + aclk_qt_data = callocz(query_thread_count, sizeof(struct aclk_qt_data)); + aclk_queries_per_thread = callocz(query_thread_count, sizeof(uint32_t)); + aclk_queries_per_thread_sample = callocz(query_thread_count, sizeof(uint32_t)); + + memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample)); + +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + aclk_stats_cfg.proto_hdl_cnt = proto_hdl_cnt; + aclk_stats_cfg.aclk_proto_rx_msgs_sample = callocz(proto_hdl_cnt, sizeof(*aclk_proto_rx_msgs_sample)); + aclk_proto_rx_msgs_sample = callocz(proto_hdl_cnt, sizeof(*aclk_proto_rx_msgs_sample)); + aclk_stats_cfg.rx_msg_dims = callocz(proto_hdl_cnt, sizeof(RRDDIM*)); +#endif +} + void aclk_stats_thread_cleanup() { +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + freez(aclk_stats_cfg.rx_msg_dims); + freez(aclk_proto_rx_msgs_sample); + freez(aclk_stats_cfg.aclk_proto_rx_msgs_sample); +#endif freez(aclk_qt_data); freez(aclk_queries_per_thread); freez(aclk_queries_per_thread_sample); @@ -240,17 +297,12 @@ void *aclk_stats_main_thread(void *ptr) { struct aclk_stats_thread *args = ptr; - query_thread_count = args->query_thread_count; - aclk_qt_data = callocz(query_thread_count, sizeof(struct aclk_qt_data)); - aclk_queries_per_thread = callocz(query_thread_count, sizeof(uint32_t)); - aclk_queries_per_thread_sample = callocz(query_thread_count, sizeof(uint32_t)); + aclk_stats_cfg.query_thread_count = args->query_thread_count; heartbeat_t hb; heartbeat_init(&hb); usec_t step_ut = localhost->rrd_update_every * USEC_PER_SEC; - memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample)); - struct aclk_metrics_per_sample per_sample; struct aclk_metrics permanent; @@ -266,11 +318,15 @@ void *aclk_stats_main_thread(void *ptr) // to not hold lock longer than necessary, especially not to hold it // during database rrd* operations memcpy(&per_sample, &aclk_metrics_per_sample, sizeof(struct aclk_metrics_per_sample)); +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + memcpy(aclk_stats_cfg.aclk_proto_rx_msgs_sample, aclk_proto_rx_msgs_sample, sizeof(*aclk_proto_rx_msgs_sample) * aclk_stats_cfg.proto_hdl_cnt); + memset(aclk_proto_rx_msgs_sample, 0, sizeof(*aclk_proto_rx_msgs_sample) * aclk_stats_cfg.proto_hdl_cnt); +#endif memcpy(&permanent, &aclk_metrics, sizeof(struct aclk_metrics)); memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample)); - memcpy(aclk_queries_per_thread_sample, aclk_queries_per_thread, sizeof(uint32_t) * query_thread_count); - memset(aclk_queries_per_thread, 0, sizeof(uint32_t) * query_thread_count); + memcpy(aclk_queries_per_thread_sample, aclk_queries_per_thread, sizeof(uint32_t) * aclk_stats_cfg.query_thread_count); + memset(aclk_queries_per_thread, 0, sizeof(uint32_t) * aclk_stats_cfg.query_thread_count); ACLK_STATS_UNLOCK; aclk_stats_collect(&per_sample, &permanent); @@ -286,6 +342,10 @@ void *aclk_stats_main_thread(void *ptr) aclk_stats_query_threads(aclk_queries_per_thread_sample); aclk_stats_query_time(&per_sample); + +#ifdef ENABLE_NEW_CLOUD_PROTOCOL + aclk_stats_newproto_rx(aclk_stats_cfg.aclk_proto_rx_msgs_sample); +#endif } return 0; diff --git a/aclk/aclk_stats.h b/aclk/aclk_stats.h index ca75ec3b64..4f2894798f 100644 --- a/aclk/aclk_stats.h +++ b/aclk/aclk_stats.h @@ -60,9 +60,14 @@ extern struct aclk_metrics_per_sample { volatile uint32_t cloud_q_process_max; } aclk_metrics_per_sample; +#ifdef ENABLE_NEW_CLOUD_PROTOCOL +extern uint32_t *aclk_proto_rx_msgs_sample; +#endif + extern uint32_t *aclk_queries_per_thread; void *aclk_stats_main_thread(void *ptr); +void aclk_stats_thread_prepare(int query_thread_count, unsigned int proto_hdl_cnt); void aclk_stats_thread_cleanup(); void aclk_stats_upd_online(int online); |