summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorStelios Fragkakis <stelios.fragakis@gmail.com>2019-07-28 16:47:44 +0300
committerStelios Fragkakis <stelios.fragakis@gmail.com>2019-07-28 16:47:44 +0300
commit50f07f0db3ff81354a7260e0c1078cd8c5e449df (patch)
tree8465d7acb8050e28b5a8364fbfe3800c05e0977b
parentcbeb3e262a66739a9137fab93701468c1f9f7272 (diff)
Attempt to provide stream statistics as per #4934 request
* The new stream statistics section will be available by enabling (set to yes) the [global] # activate stream statistics = no into stream.conf * Add a new "streams" section under Netdata monitoring * Add a memory chart that attempts to measure the amount of RAM allocated in the master server (charts and dimensions) * Add a "streams" chart that reports the number currently incoming streams (slaves) * Add a "processors" chart that reports the number of active processors of all slaves Incercepting the active processors HOST variable * Add a "metrics" chart to count all the dimensions provided by the slaves * Add a "processes" chart for active processes reported by the slaves Intercepting "system.active_processes" dimension
-rw-r--r--collectors/plugins.d/plugins_d.c327
-rw-r--r--collectors/plugins.d/plugins_d.h11
-rw-r--r--streaming/rrdpush.c2
3 files changed, 232 insertions, 108 deletions
diff --git a/collectors/plugins.d/plugins_d.c b/collectors/plugins.d/plugins_d.c
index 714a1dd2ec..745751b908 100644
--- a/collectors/plugins.d/plugins_d.c
+++ b/collectors/plugins.d/plugins_d.c
@@ -2,12 +2,11 @@
#include "plugins_d.h"
+
+extern unsigned int stream_stats_enabled;
char *plugin_directories[PLUGINSD_MAX_DIRECTORIES] = { NULL };
struct plugind *pluginsd_root = NULL;
-//extern RRDSET *__netdata_stream_st;
-
-
static inline int pluginsd_space(char c) {
switch(c) {
case ' ':
@@ -36,6 +35,34 @@ inline int config_isspace(char c) {
}
}
+inline size_t get_set_memory_usage(RRDSET *st)
+{
+ size_t my_memory;
+
+ if (unlikely(!st))
+ return (size_t) 0;
+
+ my_memory = st->memsize;
+ my_memory += st->plugin_name?strlen(st->plugin_name):0;
+ my_memory += st->module_name?strlen(st->module_name):0;
+ my_memory += st->config_section?strlen(st->config_section):0;
+
+ return (size_t) my_memory;
+}
+
+inline size_t get_dim_memory_usage(RRDDIM *rd)
+{
+ size_t my_memory;
+
+ if (unlikely(!rd))
+ return (size_t) 0;
+
+ my_memory = rd->memsize;
+ my_memory += rd->id?strlen(rd->id):0;
+ my_memory += rd->cache_filename?strlen(rd->cache_filename):0;
+ return (size_t) my_memory;
+}
+
// split a text into words, respecting quotes
static inline int quoted_strings_splitter(char *str, char **words, int max_words, int (*custom_isspace)(char)) {
char *s = str, quote = 0;
@@ -226,7 +253,6 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int
}
size_t count = 0;
- size_t total_bytes = 0;
char line[PLUGINSD_LINE_MAX + 1];
@@ -241,7 +267,7 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int
RRDSET *st = NULL;
uint32_t hash;
- unsigned int stream_stats_enabled = 1;
+ unsigned int capture_processes = 0;
errno = 0;
clearerr(fp);
@@ -251,31 +277,113 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int
goto cleanup;
}
- RRDSET *__netdata_stream_st = NULL;
-
- if (stream_stats_enabled && host != localhost) {
- __netdata_stream_st = rrdset_create_localhost(
- "netdata"
- , host->hostname
- , host->hostname
- , "streams"
- , NULL
- , "NetData Client Stream"
- , "RAM (MB)"
- , "netdata"
- , "stats"
- , 130000
- , localhost->rrd_update_every
- , RRDSET_TYPE_LINE
- );
+ static collected_number total_streams = 0; // total streams we are collecting data from
+ static collected_number total_ram = 0; // total ram consumed for streams we collect from
+ collected_number session_ram = 0; // RAM for one stream
+ static collected_number total_metrics = 0; // total metrics we collect(dimensions)
+ collected_number session_metrics = 0; // session metrics
+ static collected_number total_processes = 0; // total processes in slaves
+ collected_number session_processes = 0; // session processes
+ static collected_number total_processors = 0; // Total processors as reported by incoming streams (HOST variable active_processors)
+ collected_number session_processors = 0; // -- For session
+ static RRDSET *__stream_mem_st = NULL; // Chart for memory usage
+ static RRDSET *__stream_cnt_st = NULL; // for slave count
+ static RRDSET *__stream_prs_st = NULL; // for active processors
+ static RRDSET *__stream_metrics_st = NULL; // for active metrics we collect from slaves
+ static RRDSET *__stream_processes_st = NULL; // for total active processes in slaves
+ static time_t stream_updated = 0; // Used so that we Do not rrdset_done in less than a second
+ time_t thread_start_time;
+ static int stream_stats_activated = 0;
+
+ if (likely(stream_stats_enabled) && likely(!stream_stats_activated) && likely(host != localhost)) {
+ stream_stats_activated = 1;
+
+ __stream_mem_st = rrdset_create_localhost(
+ "netdata"
+ , "memory"
+ , NULL
+ , "streams"
+ , NULL
+ , "NetData Incoming Streams"
+ , "MB"
+ , "netdata"
+ , "stats"
+ , 130000
+ , localhost->rrd_update_every
+ , RRDSET_TYPE_LINE
+ );
+ __stream_cnt_st = rrdset_create_localhost(
+ "netdata"
+ , "stream count"
+ , "streams"
+ , "streams"
+ , NULL
+ , "NetData Incoming Streams"
+ , "Count"
+ , "netdata"
+ , "stats"
+ , 130001
+ , localhost->rrd_update_every
+ , RRDSET_TYPE_LINE
+ );
+ __stream_prs_st = rrdset_create_localhost(
+ "netdata"
+ , "active processors"
+ , "streams"
+ , "streams"
+ , NULL
+ , "NetData Incoming Streams"
+ , "Count"
+ , "netdata"
+ , "stats"
+ , 130002
+ , localhost->rrd_update_every
+ , RRDSET_TYPE_LINE
+ );
+ __stream_metrics_st = rrdset_create_localhost(
+ "netdata"
+ , "metrics"
+ , "streams"
+ , "streams"
+ , NULL
+ , "NetData Incoming Streams"
+ , "Count"
+ , "netdata"
+ , "stats"
+ , 130003
+ , localhost->rrd_update_every
+ , RRDSET_TYPE_LINE
+ );
+ __stream_processes_st = rrdset_create_localhost(
+ "netdata"
+ , "processes"
+ , "streams"
+ , "streams"
+ , NULL
+ , "NetData Incoming Streams"
+ , "Count"
+ , "netdata"
+ , "stats"
+ , 130004
+ , localhost->rrd_update_every
+ , RRDSET_TYPE_LINE
+ );
+
+ if (likely(__stream_mem_st))
+ rrddim_add(__stream_mem_st, PLUGINSD_STREAM_MEMORY, PLUGINSD_STREAM_MEMORY, 1, 1024 * 1024, RRD_ALGORITHM_ABSOLUTE);
+ if (likely(__stream_cnt_st))
+ rrddim_add(__stream_cnt_st, PLUGINSD_STREAM_COUNT, PLUGINSD_STREAM_COUNT, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ if (likely(__stream_prs_st))
+ rrddim_add(__stream_prs_st, PLUGINSD_STREAM_PROCESSORS, PLUGINSD_STREAM_PROCESSORS, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ if (likely(__stream_metrics_st))
+ rrddim_add(__stream_metrics_st, PLUGINSD_STREAM_METRICS, PLUGINSD_STREAM_METRICS, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ if (likely(__stream_processes_st))
+ rrddim_add(__stream_processes_st, PLUGINSD_STREAM_PROCESSES, PLUGINSD_STREAM_PROCESSES, 1, 1, RRD_ALGORITHM_ABSOLUTE);
}
- if (__netdata_stream_st && host != localhost && stream_stats_enabled) {
- RRDDIM *slave_mem = rrddim_find(__netdata_stream_st, host->hostname);
- if (!slave_mem) {
- rrddim_add(__netdata_stream_st, host->hostname, NULL, 1, 1024 * 1024, RRD_ALGORITHM_ABSOLUTE);
- info("adding dimension %s on streams", host->hostname);
- }
+ if (likely(host != localhost) && likely(stream_stats_enabled)) {
+ total_streams++;
+ thread_start_time = now_realtime_sec();
}
#ifdef ENABLE_HTTPS
@@ -365,8 +473,18 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int
enabled = 0;
break;
}
- else
- rrddim_set_by_pointer(st, rd, strtoll(value, NULL, 0));
+ else {
+ collected_number v = strtoll(value, NULL, 0);
+ rrddim_set_by_pointer(st, rd, v);
+ if (likely(host != localhost) && likely(stream_stats_enabled) && capture_processes) {
+ // extra sanity check
+ if (strcmp(dimension,"active") == 0) {
+ session_processes += v;
+ total_processes += v;
+ }
+ capture_processes = 0;
+ }
+ }
}
}
else if(likely(hash == BEGIN_HASH && !strcmp(s, PLUGINSD_KEYWORD_BEGIN))) {
@@ -399,12 +517,13 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int
else rrdset_next(st);
}
- //info("Total bytes = %ld, reseting", total_bytes);
-
- //total_bytes = 0;
-
- if (__netdata_stream_st)
- rrdset_next(__netdata_stream_st);
+ if (likely(host != localhost) && likely(stream_stats_enabled)) {
+ capture_processes = (strcmp(id,"system.active_processes") == 0);
+ if (capture_processes) {
+ total_processes -= session_processes;
+ session_processes = 0;
+ }
+ }
}
else if(likely(hash == END_HASH && !strcmp(s, PLUGINSD_KEYWORD_END))) {
if(unlikely(!st)) {
@@ -416,19 +535,37 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int
if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
debug(D_PLUGINSD, "requested an END on chart %s", st->id);
- if (__netdata_stream_st && host != localhost) {
- RRDDIM *rd_exists = rrddim_find(__netdata_stream_st, host->hostname);
- if (rd_exists) {
- //char name[RRDVAR_MAX_LENGTH + 1];
- //snprintfz(name, RRDVAR_MAX_LENGTH, "%s.memory", host->hostname);
+ if (likely(stream_stats_enabled) && (now_realtime_sec() - stream_updated > 0)) {
+ if (likely(__stream_mem_st)) {
+ rrddim_set(__stream_mem_st, PLUGINSD_STREAM_MEMORY, (collected_number) total_ram);
+ //rrdset_next(__stream_mem_st);
+ rrdset_done(__stream_mem_st);
+ }
+
+ if (likely(__stream_cnt_st)) {
+ rrddim_set(__stream_cnt_st, PLUGINSD_STREAM_COUNT, (collected_number) total_streams);
+ //rrdset_next(__stream_cnt_st);
+ rrdset_done(__stream_cnt_st);
+ }
+
+ if (likely(__stream_prs_st)) {
+ rrddim_set(__stream_prs_st, PLUGINSD_STREAM_PROCESSORS, (collected_number) total_processors);
+ //rrdset_next(__stream_prs_st);
+ rrdset_done(__stream_prs_st);
+ }
+
+ if (likely(__stream_metrics_st)) {
+ rrddim_set(__stream_metrics_st, PLUGINSD_STREAM_METRICS, (collected_number) total_metrics);
+ //rrdset_next(__stream_metrics_st);
+ rrdset_done(__stream_metrics_st);
+ }
- //RRDSETVAR *rs = rrdsetvar_custom_chart_variable_create(__netdata_stream_st, name);
- //if (rs)
- //rrddim_set(__netdata_stream_st, host->hostname, (collected_number) *(calculated_number *) rs->value);
- rrddim_set(__netdata_stream_st, host->hostname, (collected_number) total_bytes);
- }
- rrdset_next(__netdata_stream_st);
- rrdset_done(__netdata_stream_st);
+ if (likely(__stream_processes_st)) {
+ rrddim_set(__stream_processes_st, PLUGINSD_STREAM_PROCESSES, (collected_number) total_processes);
+ //rrdset_next(__stream_processes_st);
+ rrdset_done(__stream_processes_st);
+ }
+ stream_updated = now_realtime_sec();
}
rrdset_done(st);
@@ -506,11 +643,11 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int
, priority
, update_every
);
-
- //char fullid[RRD_ID_LENGTH_MAX + 1];
- //snprintfz(fullid, RRD_ID_LENGTH_MAX, "%s.%s", type, id);
- //RRDSET *exists_st = rrdset_find(host, fullid);
+ char fullid[RRD_ID_LENGTH_MAX + 1];
+ snprintfz(fullid, RRD_ID_LENGTH_MAX, "%s.%s", type, id);
+ RRDSET *old_st = rrdset_find(host, fullid);
+
st = rrdset_create(
host
, type
@@ -527,30 +664,12 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int
, chart_type
);
- total_bytes += st->memsize;
-
- /*{
- if (__netdata_stream_st && host != localhost && !exists_st) {
- RRDDIM *rd_exists = rrddim_find(__netdata_stream_st, host->hostname);
- if (rd_exists) {
- char name[RRDVAR_MAX_LENGTH + 1];
- snprintfz(name, RRDVAR_MAX_LENGTH, "%s.memory", host->hostname);
- //calculated_number v = (calculated_number) st->memsize;
- RRDSETVAR *rs = rrdsetvar_custom_chart_variable_create(__netdata_stream_st, name);
- if (rs) {
- //rrdsetvar_custom_chart_variable_set(rs, v);
- rrdsetvar_custom_chart_variable_set(rs, (calculated_number) *((calculated_number *) rs->value) + (calculated_number) st->memsize);
- info("Updated variable %s with value %Lf for dimension %s %s", name,
- (calculated_number) *((calculated_number *) rs->value) + (calculated_number) st->memsize,
- st->id, id);
- }
- else {
- error("cannot find/create variable '%s' for incoming stream on host '%s'", name, host->hostname);
- }
- }
- }
- }*/
-
+ if (likely(host != localhost) && likely(stream_stats_enabled)
+ && (old_st != st || ((now_realtime_sec() - thread_start_time) < PLUGINSD_STREAM_INIT_ALLOW_TIME))) {
+ size_t compute_size = get_set_memory_usage(st);
+ session_ram += compute_size; // size for current stream
+ total_ram += compute_size; // keep size for all streams
+ }
if(options && *options) {
if(strstr(options, "obsolete"))
@@ -619,27 +738,18 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int
, divisor
, options?options:""
);
-
- //RRDDIM *rd_exists = rrddim_find(st, id);
+
+ RRDDIM *old_rd = rrddim_find(st, id);
RRDDIM *rd = rrddim_add(st, id, name, multiplier, divisor, rrd_algorithm_id(algorithm));
- total_bytes += rd->memsize;
-
- // calculate memory
- /*if (__netdata_stream_st && host != localhost && !rd_exists) {
- char name[RRDVAR_MAX_LENGTH + 1];
- snprintfz(name, RRDVAR_MAX_LENGTH, "%s.memory", host->hostname);
- RRDSETVAR *rs = rrdsetvar_custom_chart_variable_create(__netdata_stream_st, name);
- if (rs) {
- rrdsetvar_custom_chart_variable_set(rs, (calculated_number) *((calculated_number *) rs->value) + (calculated_number) rd->memsize);
- //info("Updated variable %s with value %Lf for dimension %s %s", name,
- // (calculated_number) *((calculated_number *) rs->value) + (calculated_number) rd->memsize,
- // st->id, id);
- }
- else {
- error("cannot find/create variable '%s' for incoming stream on host '%s'", name, host->hostname);
- }
- }*/
+ if (likely(host != localhost) && likely(stream_stats_enabled)
+ && (old_rd != rd || ((now_realtime_sec() - thread_start_time) < PLUGINSD_STREAM_INIT_ALLOW_TIME))) {
+ size_t compute_size = get_dim_memory_usage(rd);
+ session_ram += compute_size;
+ total_ram += compute_size;
+ session_metrics++;
+ total_metrics++;
+ }
rrddim_flag_clear(rd, RRDDIM_FLAG_HIDDEN);
rrddim_flag_clear(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS);
@@ -696,7 +806,13 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int
if(global) {
RRDVAR *rv = rrdvar_custom_host_variable_create(host, name);
- if (rv) rrdvar_custom_host_variable_set(host, rv, v);
+ if (rv) {
+ rrdvar_custom_host_variable_set(host, rv, v);
+ if (likely(host != localhost) && likely(stream_stats_enabled) && strcmp(name,"active_processors") == 0) {
+ session_processors += v;
+ total_processors += v;
+ }
+ }
else error("cannot find/create HOST VARIABLE '%s' on host '%s'", name, host->hostname);
}
else if(st) {
@@ -729,18 +845,13 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int
cleanup:
cd->enabled = enabled;
- if (host != localhost) {
- if (__netdata_stream_st) {
- RRDDIM *rd_exists = rrddim_find(__netdata_stream_st, host->hostname);
- if (rd_exists) {
- rrddim_hide(__netdata_stream_st, host->hostname);
- rrddim_is_obsolete(__netdata_stream_st, rd_exists);
- rrddim_free(__netdata_stream_st, rd_exists);
- }
- rrdset_flag_set(__netdata_stream_st, RRDSET_FLAG_HIDDEN);
- rrdset_is_obsolete(__netdata_stream_st);
- }
- }
+ if (likely(host != localhost) && likely(stream_stats_enabled)) {
+ total_streams--;
+ total_ram -= session_ram;
+ total_processors -= session_processors;
+ total_metrics -= session_metrics;
+ total_processes -= session_processes;
+ }
if(likely(count)) {
cd->successful_collections += count;
diff --git a/collectors/plugins.d/plugins_d.h b/collectors/plugins.d/plugins_d.h
index 7d5c7dda47..7f2d35308f 100644
--- a/collectors/plugins.d/plugins_d.h
+++ b/collectors/plugins.d/plugins_d.h
@@ -34,6 +34,14 @@
#define PLUGINSD_LINE_MAX_SSL_READ 512
#define PLUGINSD_MAX_WORDS 20
+#define PLUGINSD_STREAM_INIT_ALLOW_TIME 120 // Do not check for existing SET or DIMs within these seconds after slave thread init
+
+#define PLUGINSD_STREAM_MEMORY "ram"
+#define PLUGINSD_STREAM_COUNT "streams"
+#define PLUGINSD_STREAM_PROCESSORS "processors"
+#define PLUGINSD_STREAM_METRICS "metrics"
+#define PLUGINSD_STREAM_PROCESSES "processes"
+
#define PLUGINSD_MAX_DIRECTORIES 20
extern char *plugin_directories[PLUGINSD_MAX_DIRECTORIES];
@@ -73,4 +81,7 @@ extern int pluginsd_initialize_plugin_directories();
extern int config_isspace(char c);
+extern size_t get_set_memory_usage(RRDSET *st);
+extern size_t get_dim_memory_usage(RRDDIM *rd);
+
#endif /* NETDATA_PLUGINS_D_H */
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index c798cd1b19..7acff57ed7 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -44,6 +44,7 @@ static struct config stream_config = {
}
};
+unsigned int stream_stats_enabled = 0;
unsigned int default_rrdpush_enabled = 0;
char *default_rrdpush_destination = NULL;
char *default_rrdpush_api_key = NULL;
@@ -76,6 +77,7 @@ int rrdpush_init() {
default_rrdpush_api_key = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "api key", "");
default_rrdpush_send_charts_matching = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "send charts matching", "*");
rrdhost_free_orphan_time = config_get_number(CONFIG_SECTION_GLOBAL, "cleanup orphan hosts after seconds", rrdhost_free_orphan_time);
+ stream_stats_enabled = (unsigned int)appconfig_get_boolean(&stream_config, CONFIG_SECTION_GLOBAL, "activate stream statistics", 0);
if(default_rrdpush_enabled && (!default_rrdpush_destination || !*default_rrdpush_destination || !default_rrdpush_api_key || !*default_rrdpush_api_key)) {
error("STREAM [send]: cannot enable sending thread - information is missing.");