diff options
author | Stelios Fragkakis <stelios.fragakis@gmail.com> | 2019-07-28 16:47:44 +0300 |
---|---|---|
committer | Stelios Fragkakis <stelios.fragakis@gmail.com> | 2019-07-28 16:47:44 +0300 |
commit | 50f07f0db3ff81354a7260e0c1078cd8c5e449df (patch) | |
tree | 8465d7acb8050e28b5a8364fbfe3800c05e0977b | |
parent | cbeb3e262a66739a9137fab93701468c1f9f7272 (diff) |
Attempt to provide stream statistics as per #4934 request
* The new stream statistics section will be available by enabling (set to yes) the
[global]
# activate stream statistics = no
into stream.conf
* Add a new "streams" section under Netdata monitoring
* Add a memory chart that attempts to measure the amount of RAM allocated in the master server
(charts and dimensions)
* Add a "streams" chart that reports the number currently incoming streams (slaves)
* Add a "processors" chart that reports the number of active processors of all slaves
Incercepting the active processors HOST variable
* Add a "metrics" chart to count all the dimensions provided by the slaves
* Add a "processes" chart for active processes reported by the slaves
Intercepting "system.active_processes" dimension
-rw-r--r-- | collectors/plugins.d/plugins_d.c | 327 | ||||
-rw-r--r-- | collectors/plugins.d/plugins_d.h | 11 | ||||
-rw-r--r-- | streaming/rrdpush.c | 2 |
3 files changed, 232 insertions, 108 deletions
diff --git a/collectors/plugins.d/plugins_d.c b/collectors/plugins.d/plugins_d.c index 714a1dd2ec..745751b908 100644 --- a/collectors/plugins.d/plugins_d.c +++ b/collectors/plugins.d/plugins_d.c @@ -2,12 +2,11 @@ #include "plugins_d.h" + +extern unsigned int stream_stats_enabled; char *plugin_directories[PLUGINSD_MAX_DIRECTORIES] = { NULL }; struct plugind *pluginsd_root = NULL; -//extern RRDSET *__netdata_stream_st; - - static inline int pluginsd_space(char c) { switch(c) { case ' ': @@ -36,6 +35,34 @@ inline int config_isspace(char c) { } } +inline size_t get_set_memory_usage(RRDSET *st) +{ + size_t my_memory; + + if (unlikely(!st)) + return (size_t) 0; + + my_memory = st->memsize; + my_memory += st->plugin_name?strlen(st->plugin_name):0; + my_memory += st->module_name?strlen(st->module_name):0; + my_memory += st->config_section?strlen(st->config_section):0; + + return (size_t) my_memory; +} + +inline size_t get_dim_memory_usage(RRDDIM *rd) +{ + size_t my_memory; + + if (unlikely(!rd)) + return (size_t) 0; + + my_memory = rd->memsize; + my_memory += rd->id?strlen(rd->id):0; + my_memory += rd->cache_filename?strlen(rd->cache_filename):0; + return (size_t) my_memory; +} + // split a text into words, respecting quotes static inline int quoted_strings_splitter(char *str, char **words, int max_words, int (*custom_isspace)(char)) { char *s = str, quote = 0; @@ -226,7 +253,6 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int } size_t count = 0; - size_t total_bytes = 0; char line[PLUGINSD_LINE_MAX + 1]; @@ -241,7 +267,7 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int RRDSET *st = NULL; uint32_t hash; - unsigned int stream_stats_enabled = 1; + unsigned int capture_processes = 0; errno = 0; clearerr(fp); @@ -251,31 +277,113 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int goto cleanup; } - RRDSET *__netdata_stream_st = NULL; - - if (stream_stats_enabled && host != localhost) { - __netdata_stream_st = rrdset_create_localhost( - "netdata" - , host->hostname - , host->hostname - , "streams" - , NULL - , "NetData Client Stream" - , "RAM (MB)" - , "netdata" - , "stats" - , 130000 - , localhost->rrd_update_every - , RRDSET_TYPE_LINE - ); + static collected_number total_streams = 0; // total streams we are collecting data from + static collected_number total_ram = 0; // total ram consumed for streams we collect from + collected_number session_ram = 0; // RAM for one stream + static collected_number total_metrics = 0; // total metrics we collect(dimensions) + collected_number session_metrics = 0; // session metrics + static collected_number total_processes = 0; // total processes in slaves + collected_number session_processes = 0; // session processes + static collected_number total_processors = 0; // Total processors as reported by incoming streams (HOST variable active_processors) + collected_number session_processors = 0; // -- For session + static RRDSET *__stream_mem_st = NULL; // Chart for memory usage + static RRDSET *__stream_cnt_st = NULL; // for slave count + static RRDSET *__stream_prs_st = NULL; // for active processors + static RRDSET *__stream_metrics_st = NULL; // for active metrics we collect from slaves + static RRDSET *__stream_processes_st = NULL; // for total active processes in slaves + static time_t stream_updated = 0; // Used so that we Do not rrdset_done in less than a second + time_t thread_start_time; + static int stream_stats_activated = 0; + + if (likely(stream_stats_enabled) && likely(!stream_stats_activated) && likely(host != localhost)) { + stream_stats_activated = 1; + + __stream_mem_st = rrdset_create_localhost( + "netdata" + , "memory" + , NULL + , "streams" + , NULL + , "NetData Incoming Streams" + , "MB" + , "netdata" + , "stats" + , 130000 + , localhost->rrd_update_every + , RRDSET_TYPE_LINE + ); + __stream_cnt_st = rrdset_create_localhost( + "netdata" + , "stream count" + , "streams" + , "streams" + , NULL + , "NetData Incoming Streams" + , "Count" + , "netdata" + , "stats" + , 130001 + , localhost->rrd_update_every + , RRDSET_TYPE_LINE + ); + __stream_prs_st = rrdset_create_localhost( + "netdata" + , "active processors" + , "streams" + , "streams" + , NULL + , "NetData Incoming Streams" + , "Count" + , "netdata" + , "stats" + , 130002 + , localhost->rrd_update_every + , RRDSET_TYPE_LINE + ); + __stream_metrics_st = rrdset_create_localhost( + "netdata" + , "metrics" + , "streams" + , "streams" + , NULL + , "NetData Incoming Streams" + , "Count" + , "netdata" + , "stats" + , 130003 + , localhost->rrd_update_every + , RRDSET_TYPE_LINE + ); + __stream_processes_st = rrdset_create_localhost( + "netdata" + , "processes" + , "streams" + , "streams" + , NULL + , "NetData Incoming Streams" + , "Count" + , "netdata" + , "stats" + , 130004 + , localhost->rrd_update_every + , RRDSET_TYPE_LINE + ); + + if (likely(__stream_mem_st)) + rrddim_add(__stream_mem_st, PLUGINSD_STREAM_MEMORY, PLUGINSD_STREAM_MEMORY, 1, 1024 * 1024, RRD_ALGORITHM_ABSOLUTE); + if (likely(__stream_cnt_st)) + rrddim_add(__stream_cnt_st, PLUGINSD_STREAM_COUNT, PLUGINSD_STREAM_COUNT, 1, 1, RRD_ALGORITHM_ABSOLUTE); + if (likely(__stream_prs_st)) + rrddim_add(__stream_prs_st, PLUGINSD_STREAM_PROCESSORS, PLUGINSD_STREAM_PROCESSORS, 1, 1, RRD_ALGORITHM_ABSOLUTE); + if (likely(__stream_metrics_st)) + rrddim_add(__stream_metrics_st, PLUGINSD_STREAM_METRICS, PLUGINSD_STREAM_METRICS, 1, 1, RRD_ALGORITHM_ABSOLUTE); + if (likely(__stream_processes_st)) + rrddim_add(__stream_processes_st, PLUGINSD_STREAM_PROCESSES, PLUGINSD_STREAM_PROCESSES, 1, 1, RRD_ALGORITHM_ABSOLUTE); } - if (__netdata_stream_st && host != localhost && stream_stats_enabled) { - RRDDIM *slave_mem = rrddim_find(__netdata_stream_st, host->hostname); - if (!slave_mem) { - rrddim_add(__netdata_stream_st, host->hostname, NULL, 1, 1024 * 1024, RRD_ALGORITHM_ABSOLUTE); - info("adding dimension %s on streams", host->hostname); - } + if (likely(host != localhost) && likely(stream_stats_enabled)) { + total_streams++; + thread_start_time = now_realtime_sec(); } #ifdef ENABLE_HTTPS @@ -365,8 +473,18 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int enabled = 0; break; } - else - rrddim_set_by_pointer(st, rd, strtoll(value, NULL, 0)); + else { + collected_number v = strtoll(value, NULL, 0); + rrddim_set_by_pointer(st, rd, v); + if (likely(host != localhost) && likely(stream_stats_enabled) && capture_processes) { + // extra sanity check + if (strcmp(dimension,"active") == 0) { + session_processes += v; + total_processes += v; + } + capture_processes = 0; + } + } } } else if(likely(hash == BEGIN_HASH && !strcmp(s, PLUGINSD_KEYWORD_BEGIN))) { @@ -399,12 +517,13 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int else rrdset_next(st); } - //info("Total bytes = %ld, reseting", total_bytes); - - //total_bytes = 0; - - if (__netdata_stream_st) - rrdset_next(__netdata_stream_st); + if (likely(host != localhost) && likely(stream_stats_enabled)) { + capture_processes = (strcmp(id,"system.active_processes") == 0); + if (capture_processes) { + total_processes -= session_processes; + session_processes = 0; + } + } } else if(likely(hash == END_HASH && !strcmp(s, PLUGINSD_KEYWORD_END))) { if(unlikely(!st)) { @@ -416,19 +535,37 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) debug(D_PLUGINSD, "requested an END on chart %s", st->id); - if (__netdata_stream_st && host != localhost) { - RRDDIM *rd_exists = rrddim_find(__netdata_stream_st, host->hostname); - if (rd_exists) { - //char name[RRDVAR_MAX_LENGTH + 1]; - //snprintfz(name, RRDVAR_MAX_LENGTH, "%s.memory", host->hostname); + if (likely(stream_stats_enabled) && (now_realtime_sec() - stream_updated > 0)) { + if (likely(__stream_mem_st)) { + rrddim_set(__stream_mem_st, PLUGINSD_STREAM_MEMORY, (collected_number) total_ram); + //rrdset_next(__stream_mem_st); + rrdset_done(__stream_mem_st); + } + + if (likely(__stream_cnt_st)) { + rrddim_set(__stream_cnt_st, PLUGINSD_STREAM_COUNT, (collected_number) total_streams); + //rrdset_next(__stream_cnt_st); + rrdset_done(__stream_cnt_st); + } + + if (likely(__stream_prs_st)) { + rrddim_set(__stream_prs_st, PLUGINSD_STREAM_PROCESSORS, (collected_number) total_processors); + //rrdset_next(__stream_prs_st); + rrdset_done(__stream_prs_st); + } + + if (likely(__stream_metrics_st)) { + rrddim_set(__stream_metrics_st, PLUGINSD_STREAM_METRICS, (collected_number) total_metrics); + //rrdset_next(__stream_metrics_st); + rrdset_done(__stream_metrics_st); + } - //RRDSETVAR *rs = rrdsetvar_custom_chart_variable_create(__netdata_stream_st, name); - //if (rs) - //rrddim_set(__netdata_stream_st, host->hostname, (collected_number) *(calculated_number *) rs->value); - rrddim_set(__netdata_stream_st, host->hostname, (collected_number) total_bytes); - } - rrdset_next(__netdata_stream_st); - rrdset_done(__netdata_stream_st); + if (likely(__stream_processes_st)) { + rrddim_set(__stream_processes_st, PLUGINSD_STREAM_PROCESSES, (collected_number) total_processes); + //rrdset_next(__stream_processes_st); + rrdset_done(__stream_processes_st); + } + stream_updated = now_realtime_sec(); } rrdset_done(st); @@ -506,11 +643,11 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int , priority , update_every ); - - //char fullid[RRD_ID_LENGTH_MAX + 1]; - //snprintfz(fullid, RRD_ID_LENGTH_MAX, "%s.%s", type, id); - //RRDSET *exists_st = rrdset_find(host, fullid); + char fullid[RRD_ID_LENGTH_MAX + 1]; + snprintfz(fullid, RRD_ID_LENGTH_MAX, "%s.%s", type, id); + RRDSET *old_st = rrdset_find(host, fullid); + st = rrdset_create( host , type @@ -527,30 +664,12 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int , chart_type ); - total_bytes += st->memsize; - - /*{ - if (__netdata_stream_st && host != localhost && !exists_st) { - RRDDIM *rd_exists = rrddim_find(__netdata_stream_st, host->hostname); - if (rd_exists) { - char name[RRDVAR_MAX_LENGTH + 1]; - snprintfz(name, RRDVAR_MAX_LENGTH, "%s.memory", host->hostname); - //calculated_number v = (calculated_number) st->memsize; - RRDSETVAR *rs = rrdsetvar_custom_chart_variable_create(__netdata_stream_st, name); - if (rs) { - //rrdsetvar_custom_chart_variable_set(rs, v); - rrdsetvar_custom_chart_variable_set(rs, (calculated_number) *((calculated_number *) rs->value) + (calculated_number) st->memsize); - info("Updated variable %s with value %Lf for dimension %s %s", name, - (calculated_number) *((calculated_number *) rs->value) + (calculated_number) st->memsize, - st->id, id); - } - else { - error("cannot find/create variable '%s' for incoming stream on host '%s'", name, host->hostname); - } - } - } - }*/ - + if (likely(host != localhost) && likely(stream_stats_enabled) + && (old_st != st || ((now_realtime_sec() - thread_start_time) < PLUGINSD_STREAM_INIT_ALLOW_TIME))) { + size_t compute_size = get_set_memory_usage(st); + session_ram += compute_size; // size for current stream + total_ram += compute_size; // keep size for all streams + } if(options && *options) { if(strstr(options, "obsolete")) @@ -619,27 +738,18 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int , divisor , options?options:"" ); - - //RRDDIM *rd_exists = rrddim_find(st, id); + + RRDDIM *old_rd = rrddim_find(st, id); RRDDIM *rd = rrddim_add(st, id, name, multiplier, divisor, rrd_algorithm_id(algorithm)); - total_bytes += rd->memsize; - - // calculate memory - /*if (__netdata_stream_st && host != localhost && !rd_exists) { - char name[RRDVAR_MAX_LENGTH + 1]; - snprintfz(name, RRDVAR_MAX_LENGTH, "%s.memory", host->hostname); - RRDSETVAR *rs = rrdsetvar_custom_chart_variable_create(__netdata_stream_st, name); - if (rs) { - rrdsetvar_custom_chart_variable_set(rs, (calculated_number) *((calculated_number *) rs->value) + (calculated_number) rd->memsize); - //info("Updated variable %s with value %Lf for dimension %s %s", name, - // (calculated_number) *((calculated_number *) rs->value) + (calculated_number) rd->memsize, - // st->id, id); - } - else { - error("cannot find/create variable '%s' for incoming stream on host '%s'", name, host->hostname); - } - }*/ + if (likely(host != localhost) && likely(stream_stats_enabled) + && (old_rd != rd || ((now_realtime_sec() - thread_start_time) < PLUGINSD_STREAM_INIT_ALLOW_TIME))) { + size_t compute_size = get_dim_memory_usage(rd); + session_ram += compute_size; + total_ram += compute_size; + session_metrics++; + total_metrics++; + } rrddim_flag_clear(rd, RRDDIM_FLAG_HIDDEN); rrddim_flag_clear(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS); @@ -696,7 +806,13 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int if(global) { RRDVAR *rv = rrdvar_custom_host_variable_create(host, name); - if (rv) rrdvar_custom_host_variable_set(host, rv, v); + if (rv) { + rrdvar_custom_host_variable_set(host, rv, v); + if (likely(host != localhost) && likely(stream_stats_enabled) && strcmp(name,"active_processors") == 0) { + session_processors += v; + total_processors += v; + } + } else error("cannot find/create HOST VARIABLE '%s' on host '%s'", name, host->hostname); } else if(st) { @@ -729,18 +845,13 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int cleanup: cd->enabled = enabled; - if (host != localhost) { - if (__netdata_stream_st) { - RRDDIM *rd_exists = rrddim_find(__netdata_stream_st, host->hostname); - if (rd_exists) { - rrddim_hide(__netdata_stream_st, host->hostname); - rrddim_is_obsolete(__netdata_stream_st, rd_exists); - rrddim_free(__netdata_stream_st, rd_exists); - } - rrdset_flag_set(__netdata_stream_st, RRDSET_FLAG_HIDDEN); - rrdset_is_obsolete(__netdata_stream_st); - } - } + if (likely(host != localhost) && likely(stream_stats_enabled)) { + total_streams--; + total_ram -= session_ram; + total_processors -= session_processors; + total_metrics -= session_metrics; + total_processes -= session_processes; + } if(likely(count)) { cd->successful_collections += count; diff --git a/collectors/plugins.d/plugins_d.h b/collectors/plugins.d/plugins_d.h index 7d5c7dda47..7f2d35308f 100644 --- a/collectors/plugins.d/plugins_d.h +++ b/collectors/plugins.d/plugins_d.h @@ -34,6 +34,14 @@ #define PLUGINSD_LINE_MAX_SSL_READ 512 #define PLUGINSD_MAX_WORDS 20 +#define PLUGINSD_STREAM_INIT_ALLOW_TIME 120 // Do not check for existing SET or DIMs within these seconds after slave thread init + +#define PLUGINSD_STREAM_MEMORY "ram" +#define PLUGINSD_STREAM_COUNT "streams" +#define PLUGINSD_STREAM_PROCESSORS "processors" +#define PLUGINSD_STREAM_METRICS "metrics" +#define PLUGINSD_STREAM_PROCESSES "processes" + #define PLUGINSD_MAX_DIRECTORIES 20 extern char *plugin_directories[PLUGINSD_MAX_DIRECTORIES]; @@ -73,4 +81,7 @@ extern int pluginsd_initialize_plugin_directories(); extern int config_isspace(char c); +extern size_t get_set_memory_usage(RRDSET *st); +extern size_t get_dim_memory_usage(RRDDIM *rd); + #endif /* NETDATA_PLUGINS_D_H */ diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index c798cd1b19..7acff57ed7 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -44,6 +44,7 @@ static struct config stream_config = { } }; +unsigned int stream_stats_enabled = 0; unsigned int default_rrdpush_enabled = 0; char *default_rrdpush_destination = NULL; char *default_rrdpush_api_key = NULL; @@ -76,6 +77,7 @@ int rrdpush_init() { default_rrdpush_api_key = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "api key", ""); default_rrdpush_send_charts_matching = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "send charts matching", "*"); rrdhost_free_orphan_time = config_get_number(CONFIG_SECTION_GLOBAL, "cleanup orphan hosts after seconds", rrdhost_free_orphan_time); + stream_stats_enabled = (unsigned int)appconfig_get_boolean(&stream_config, CONFIG_SECTION_GLOBAL, "activate stream statistics", 0); if(default_rrdpush_enabled && (!default_rrdpush_destination || !*default_rrdpush_destination || !default_rrdpush_api_key || !*default_rrdpush_api_key)) { error("STREAM [send]: cannot enable sending thread - information is missing."); |