summaryrefslogtreecommitdiffstats
path: root/collectors/statsd.plugin
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-05-09 16:34:31 +0300
committerGitHub <noreply@github.com>2022-05-09 16:34:31 +0300
commiteb216a1f4bbb26e1f18537b30d22e8ad8711f42c (patch)
tree353938a0f71da7b04d4f9b67769d2a38ba6db2cb /collectors/statsd.plugin
parent0b3ee50c76dcc3b8dcdd13cec0e432394d3c6964 (diff)
Workers utilization charts (#12807)
* initial version of worker utilization * working example * without mutexes * monitoring DBENGINE, ACLKSYNC, WEB workers * added charts to monitor worker usage * fixed charts units * updated contexts * updated priorities * added documentation * converted threads to stacked chart * One query per query thread * Revert "One query per query thread" This reverts commit 6aeb391f5987c3c6ba2864b559fd7f0cd64b14d3. * fixed priority for web charts * read worker cpu utilization from proc * read workers cpu utilization via /proc/self/task/PID/stat, so that we have cpu utilization even when the jobs are too long to finish within our update_every frequency * disabled web server cpu utilization monitoring - it is now monitored by worker utilization * tight integration of worker utilization to web server * monitoring statsd worker threads * code cleanup and renaming of variables * contrained worker and statistics conflict to just one variable * support for rendering jobs per type * better priorities and removed the total jobs chart * added busy time in ms per job type * added proc.plugin monitoring, switch clock to MONOTONIC_RAW if available, global statistics now cleans up old worker threads * isolated worker thread families * added cgroups.plugin workers * remove unneeded dimensions when then expected worker is just one * plugins.d and streaming monitoring * rebased; support worker_is_busy() to be called one after another * added diskspace plugin monitoring * added tc.plugin monitoring * added ML threads monitoring * dont create dimensions and charts that are not needed * fix crash when job types are added on the fly * added timex and idlejitter plugins; collected heartbeat statistics; reworked heartbeat according to the POSIX * the right name is heartbeat for this chart * monitor streaming senders * added streaming senders to global stats * prevent division by zero * added clock_init() to external C plugins * added freebsd and macos plugins * added freebsd and macos to global statistics * dont use new as a variable; address compiler warnings on FreeBSD and MacOS * refactored contexts to be unique; added health threads monitoring Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'collectors/statsd.plugin')
-rw-r--r--collectors/statsd.plugin/statsd.c169
1 files changed, 88 insertions, 81 deletions
diff --git a/collectors/statsd.plugin/statsd.c b/collectors/statsd.plugin/statsd.c
index a630d00d0c..f4286ae378 100644
--- a/collectors/statsd.plugin/statsd.c
+++ b/collectors/statsd.plugin/statsd.c
@@ -9,6 +9,15 @@
#define STATSD_LISTEN_PORT 8125
#define STATSD_LISTEN_BACKLOG 4096
+#define WORKER_JOB_TYPE_TCP_CONNECTED 0
+#define WORKER_JOB_TYPE_TCP_DISCONNECTED 1
+#define WORKER_JOB_TYPE_RCV_DATA 2
+#define WORKER_JOB_TYPE_SND_DATA 3
+
+#if WORKER_UTILIZATION_MAX_JOB_TYPES < 4
+#error Please increase WORKER_UTILIZATION_MAX_JOB_TYPES to at least 4
+#endif
+
// --------------------------------------------------------------------------------------
// #define STATSD_MULTITHREADED 1
@@ -237,10 +246,6 @@ struct collection_thread_status {
size_t max_sockets;
netdata_thread_t thread;
- struct rusage rusage;
- RRDSET *st_cpu;
- RRDDIM *rd_user;
- RRDDIM *rd_system;
};
static struct statsd {
@@ -788,6 +793,7 @@ static void *statsd_add_callback(POLLINFO *pi, short int *events, void *data) {
(void)pi;
(void)data;
+ worker_is_busy(WORKER_JOB_TYPE_TCP_CONNECTED);
*events = POLLIN;
struct statsd_tcp *t = (struct statsd_tcp *)callocz(sizeof(struct statsd_tcp) + STATSD_TCP_BUFFER_SIZE, 1);
@@ -796,11 +802,14 @@ static void *statsd_add_callback(POLLINFO *pi, short int *events, void *data) {
statsd.tcp_socket_connects++;
statsd.tcp_socket_connected++;
+ worker_is_idle();
return t;
}
// TCP client disconnected
static void statsd_del_callback(POLLINFO *pi) {
+ worker_is_busy(WORKER_JOB_TYPE_TCP_DISCONNECTED);
+
struct statsd_tcp *t = pi->data;
if(likely(t)) {
@@ -818,10 +827,15 @@ static void statsd_del_callback(POLLINFO *pi) {
freez(t);
}
+
+ worker_is_idle();
}
// Receive data
static int statsd_rcv_callback(POLLINFO *pi, short int *events) {
+ int retval = -1;
+ worker_is_busy(WORKER_JOB_TYPE_RCV_DATA);
+
*events = POLLIN;
int fd = pi->fd;
@@ -832,14 +846,16 @@ static int statsd_rcv_callback(POLLINFO *pi, short int *events) {
if(unlikely(!d)) {
error("STATSD: internal error: expected TCP data pointer is NULL");
statsd.socket_errors++;
- return -1;
+ retval = -1;
+ goto cleanup;
}
#ifdef NETDATA_INTERNAL_CHECKS
if(unlikely(d->type != STATSD_SOCKET_DATA_TYPE_TCP)) {
error("STATSD: internal error: socket data type should be %d, but it is %d", (int)STATSD_SOCKET_DATA_TYPE_TCP, (int)d->type);
statsd.socket_errors++;
- return -1;
+ retval = -1;
+ goto cleanup;
}
#endif
@@ -872,8 +888,10 @@ static int statsd_rcv_callback(POLLINFO *pi, short int *events) {
d->len = statsd_process(d->buffer, d->len, 1);
}
- if(unlikely(ret == -1))
- return -1;
+ if(unlikely(ret == -1)) {
+ retval = -1;
+ goto cleanup;
+ }
} while (rc != -1);
break;
@@ -884,14 +902,16 @@ static int statsd_rcv_callback(POLLINFO *pi, short int *events) {
if(unlikely(!d)) {
error("STATSD: internal error: expected UDP data pointer is NULL");
statsd.socket_errors++;
- return -1;
+ retval = -1;
+ goto cleanup;
}
#ifdef NETDATA_INTERNAL_CHECKS
if(unlikely(d->type != STATSD_SOCKET_DATA_TYPE_UDP)) {
error("STATSD: internal error: socket data should be %d, but it is %d", (int)d->type, (int)STATSD_SOCKET_DATA_TYPE_UDP);
statsd.socket_errors++;
- return -1;
+ retval = -1;
+ goto cleanup;
}
#endif
@@ -904,7 +924,8 @@ static int statsd_rcv_callback(POLLINFO *pi, short int *events) {
if (errno != EWOULDBLOCK && errno != EAGAIN && errno != EINTR) {
error("STATSD: recvmmsg() on UDP socket %d failed.", fd);
statsd.socket_errors++;
- return -1;
+ retval = -1;
+ goto cleanup;
}
} else if (rc) {
// data received
@@ -929,7 +950,8 @@ static int statsd_rcv_callback(POLLINFO *pi, short int *events) {
if (errno != EWOULDBLOCK && errno != EAGAIN && errno != EINTR) {
error("STATSD: recv() on UDP socket %d failed.", fd);
statsd.socket_errors++;
- return -1;
+ retval = -1;
+ goto cleanup;
}
} else if (rc) {
// data received
@@ -947,24 +969,26 @@ static int statsd_rcv_callback(POLLINFO *pi, short int *events) {
default: {
error("STATSD: internal error: unknown socktype %d on socket %d", pi->socktype, fd);
statsd.socket_errors++;
- return -1;
+ retval = -1;
+ goto cleanup;
}
}
- return 0;
+ retval = 0;
+cleanup:
+ worker_is_idle();
+ return retval;
}
static int statsd_snd_callback(POLLINFO *pi, short int *events) {
(void)pi;
(void)events;
+ worker_is_busy(WORKER_JOB_TYPE_SND_DATA);
error("STATSD: snd_callback() called, but we never requested to send data to statsd clients.");
- return -1;
-}
+ worker_is_idle();
-static void statsd_timer_callback(void *timer_data) {
- struct collection_thread_status *status = timer_data;
- getrusage(RUSAGE_THREAD, &status->rusage);
+ return -1;
}
// --------------------------------------------------------------------------------------------------------------------
@@ -986,12 +1010,19 @@ void statsd_collector_thread_cleanup(void *data) {
#endif
freez(d);
+ worker_unregister();
}
void *statsd_collector_thread(void *ptr) {
struct collection_thread_status *status = ptr;
status->status = 1;
+ worker_register("STATSD");
+ worker_register_job_name(WORKER_JOB_TYPE_TCP_CONNECTED, "tcp connect");
+ worker_register_job_name(WORKER_JOB_TYPE_TCP_DISCONNECTED, "tcp disconnect");
+ worker_register_job_name(WORKER_JOB_TYPE_RCV_DATA, "receive");
+ worker_register_job_name(WORKER_JOB_TYPE_SND_DATA, "send");
+
info("STATSD collector thread started with taskid %d", gettid());
struct statsd_udp *d = callocz(sizeof(struct statsd_udp), 1);
@@ -1019,7 +1050,7 @@ void *statsd_collector_thread(void *ptr) {
, statsd_del_callback
, statsd_rcv_callback
, statsd_snd_callback
- , statsd_timer_callback
+ , NULL
, NULL // No access control pattern
, 0 // No dns lookups for access control pattern
, (void *)d
@@ -2147,9 +2178,32 @@ static void statsd_main_cleanup(void *data) {
info("STATSD: cleanup completed.");
static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
+
+ worker_unregister();
}
+#define WORKER_STATSD_FLUSH_GAUGES 0
+#define WORKER_STATSD_FLUSH_COUNTERS 1
+#define WORKER_STATSD_FLUSH_METERS 2
+#define WORKER_STATSD_FLUSH_TIMERS 3
+#define WORKER_STATSD_FLUSH_HISTOGRAMS 4
+#define WORKER_STATSD_FLUSH_SETS 5
+#define WORKER_STATSD_FLUSH_STATS 6
+
+#if WORKER_UTILIZATION_MAX_JOB_TYPES < 7
+#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 6
+#endif
+
void *statsd_main(void *ptr) {
+ worker_register("STATSDFLUSH");
+ worker_register_job_name(WORKER_STATSD_FLUSH_GAUGES, "gauges");
+ worker_register_job_name(WORKER_STATSD_FLUSH_COUNTERS, "counters");
+ worker_register_job_name(WORKER_STATSD_FLUSH_METERS, "meters");
+ worker_register_job_name(WORKER_STATSD_FLUSH_TIMERS, "timers");
+ worker_register_job_name(WORKER_STATSD_FLUSH_HISTOGRAMS, "histograms");
+ worker_register_job_name(WORKER_STATSD_FLUSH_SETS, "sets");
+ worker_register_job_name(WORKER_STATSD_FLUSH_STATS, "statistics");
+
netdata_thread_cleanup_push(statsd_main_cleanup, ptr);
// ----------------------------------------------------------------------------------------------------------------
@@ -2420,71 +2474,37 @@ void *statsd_main(void *ptr) {
);
RRDDIM *rd_pcharts = rrddim_add(st_pcharts, "charts", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
- RRDSET *stcpu_thread = rrdset_create_localhost(
- "netdata"
- , "plugin_statsd_charting_cpu"
- , NULL
- , "statsd"
- , "netdata.statsd_cpu"
- , "Netdata statsd charting thread CPU usage"
- , "milliseconds/s"
- , PLUGIN_STATSD_NAME
- , "stats"
- , 132001
- , statsd.update_every
- , RRDSET_TYPE_STACKED
- );
-
- RRDDIM *rd_user = rrddim_add(stcpu_thread, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
- RRDDIM *rd_system = rrddim_add(stcpu_thread, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
- struct rusage thread;
-
- for(i = 0; i < statsd.threads ;i++) {
- char id[100 + 1];
- char title[100 + 1];
-
- snprintfz(id, 100, "plugin_statsd_collector%d_cpu", i + 1);
- snprintfz(title, 100, "Netdata statsd collector thread No %d CPU usage", i + 1);
-
- statsd.collection_threads_status[i].st_cpu = rrdset_create_localhost(
- "netdata"
- , id
- , NULL
- , "statsd"
- , "netdata.statsd_cpu"
- , title
- , "milliseconds/s"
- , PLUGIN_STATSD_NAME
- , "stats"
- , 132002 + i
- , statsd.update_every
- , RRDSET_TYPE_STACKED
- );
-
- statsd.collection_threads_status[i].rd_user = rrddim_add(statsd.collection_threads_status[i].st_cpu, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
- statsd.collection_threads_status[i].rd_system = rrddim_add(statsd.collection_threads_status[i].st_cpu, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
- }
-
- // ----------------------------------------------------------------------------------------------------------------
+ // ----------------------------------------------------------------------------------------------------------------
// statsd thread to turn metrics into charts
usec_t step = statsd.update_every * USEC_PER_SEC;
heartbeat_t hb;
heartbeat_init(&hb);
while(!netdata_exit) {
+ worker_is_idle();
usec_t hb_dt = heartbeat_next(&hb, step);
+ worker_is_busy(WORKER_STATSD_FLUSH_GAUGES);
statsd_flush_index_metrics(&statsd.gauges, statsd_flush_gauge);
+
+ worker_is_busy(WORKER_STATSD_FLUSH_COUNTERS);
statsd_flush_index_metrics(&statsd.counters, statsd_flush_counter);
+
+ worker_is_busy(WORKER_STATSD_FLUSH_METERS);
statsd_flush_index_metrics(&statsd.meters, statsd_flush_meter);
+
+ worker_is_busy(WORKER_STATSD_FLUSH_TIMERS);
statsd_flush_index_metrics(&statsd.timers, statsd_flush_timer);
+
+ worker_is_busy(WORKER_STATSD_FLUSH_HISTOGRAMS);
statsd_flush_index_metrics(&statsd.histograms, statsd_flush_histogram);
+
+ worker_is_busy(WORKER_STATSD_FLUSH_SETS);
statsd_flush_index_metrics(&statsd.sets, statsd_flush_set);
+ worker_is_busy(WORKER_STATSD_FLUSH_STATS);
statsd_update_all_app_charts();
- getrusage(RUSAGE_THREAD, &thread);
-
if(unlikely(netdata_exit))
break;
@@ -2498,9 +2518,6 @@ void *statsd_main(void *ptr) {
rrdset_next(st_tcp_connects);
rrdset_next(st_tcp_connected);
rrdset_next(st_pcharts);
- rrdset_next(stcpu_thread);
- for(i = 0; i < statsd.threads ;i++)
- rrdset_next(statsd.collection_threads_status[i].st_cpu);
}
rrddim_set_by_pointer(st_metrics, rd_metrics_gauge, (collected_number)statsd.gauges.metrics);
@@ -2550,16 +2567,6 @@ void *statsd_main(void *ptr) {
rrddim_set_by_pointer(st_pcharts, rd_pcharts, (collected_number)statsd.private_charts);
rrdset_done(st_pcharts);
-
- rrddim_set_by_pointer(stcpu_thread, rd_user, thread.ru_utime.tv_sec * 1000000ULL + thread.ru_utime.tv_usec);
- rrddim_set_by_pointer(stcpu_thread, rd_system, thread.ru_stime.tv_sec * 1000000ULL + thread.ru_stime.tv_usec);
- rrdset_done(stcpu_thread);
-
- for(i = 0; i < statsd.threads ;i++) {
- rrddim_set_by_pointer(statsd.collection_threads_status[i].st_cpu, statsd.collection_threads_status[i].rd_user, statsd.collection_threads_status[i].rusage.ru_utime.tv_sec * 1000000ULL + statsd.collection_threads_status[i].rusage.ru_utime.tv_usec);
- rrddim_set_by_pointer(statsd.collection_threads_status[i].st_cpu, statsd.collection_threads_status[i].rd_system, statsd.collection_threads_status[i].rusage.ru_stime.tv_sec * 1000000ULL + statsd.collection_threads_status[i].rusage.ru_stime.tv_usec);
- rrdset_done(statsd.collection_threads_status[i].st_cpu);
- }
}
cleanup: ; // added semi-colon to prevent older gcc error: label at end of compound statement