diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2023-02-09 20:27:05 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-02-09 20:27:05 +0200 |
commit | 414f416c5d290db3c3eed9073258c834fac7f2f7 (patch) | |
tree | aeed0a2619a1f42c60ec816311d1dd10c8e5ccc1 /collectors | |
parent | 8043106b60ec3d8d32b3a9ff3ef53991e73d9037 (diff) |
Virtual hosts for data collection (#14464)
* support multiple hosts at pluginsd structures
* cleanup obsolete code
* use a lookup hashtable to quickly find the keyword to execute, without traversing the whole linked list of keywords
* more cleanup
* move new hash function to inlined.h
* minimize comparisons, eliminate a pre-parsing of the first keyword for each line
* cleanup parser from old code
* move parser into libnetdata
* unique entries in parser keywords hashtable
* move all hashing functions to inlined.h, name their sources, simple_hash() now defaults to FNV1a, it was FNV1
* small_hash() for parser
* plugins.d now can switch hosts, and also create/update them
* update hash function and hashtable size
* updated message
* unittest all hashing functions
* reset the chart when setting a new host
* remove host tags
* enable archived hosts when a collector pushes host info
* do not need localhost to swtich to localhost
* disable ARAL and OWA with -DFSANITIZE_ADDRESS=1
Diffstat (limited to 'collectors')
-rw-r--r-- | collectors/apps.plugin/apps_plugin.c | 4 | ||||
-rw-r--r-- | collectors/plugins.d/README.md | 77 | ||||
-rw-r--r-- | collectors/plugins.d/plugins_d.c | 93 | ||||
-rw-r--r-- | collectors/plugins.d/plugins_d.h | 9 | ||||
-rw-r--r-- | collectors/plugins.d/pluginsd_parser.c | 288 | ||||
-rw-r--r-- | collectors/plugins.d/pluginsd_parser.h | 17 | ||||
-rw-r--r-- | collectors/statsd.plugin/statsd.c | 2 |
7 files changed, 434 insertions, 56 deletions
diff --git a/collectors/apps.plugin/apps_plugin.c b/collectors/apps.plugin/apps_plugin.c index 84506c8e19..b432abb6e7 100644 --- a/collectors/apps.plugin/apps_plugin.c +++ b/collectors/apps.plugin/apps_plugin.c @@ -4336,7 +4336,7 @@ static void apps_plugin_function_processes(const char *transaction, char *functi struct pid_stat *p; char *words[PLUGINSD_MAX_WORDS] = { NULL }; - size_t num_words = pluginsd_split_words(function, words, PLUGINSD_MAX_WORDS, NULL, NULL, 0); + size_t num_words = pluginsd_split_words(function, words, PLUGINSD_MAX_WORDS); struct target *category = NULL, *user = NULL, *group = NULL; const char *process_name = NULL; @@ -4809,7 +4809,7 @@ void *reader_main(void *arg __maybe_unused) { while(!apps_plugin_exit && (s = fgets(buffer, PLUGINSD_LINE_MAX, stdin))) { char *words[PLUGINSD_MAX_WORDS] = { NULL }; - size_t num_words = pluginsd_split_words(buffer, words, PLUGINSD_MAX_WORDS, NULL, NULL, 0); + size_t num_words = pluginsd_split_words(buffer, words, PLUGINSD_MAX_WORDS); const char *keyword = get_word(words, num_words, 0); diff --git a/collectors/plugins.d/README.md b/collectors/plugins.d/README.md index 8ad1d3a65d..375099b128 100644 --- a/collectors/plugins.d/README.md +++ b/collectors/plugins.d/README.md @@ -175,6 +175,83 @@ The plugin should output instructions for Netdata to its output (`stdout`). Sinc `DISABLE` will disable this plugin. This will prevent Netdata from restarting the plugin. You can also exit with the value `1` to have the same effect. +#### HOST_DEFINE + +`HOST_DEFINE` defines a new (or updates an existing) virtual host. + +The template is: + +> HOST_DEFINE machine_guid hostname + +where: + +- `machine_guid` + + uniquely identifies the host, this is what will be needed to add charts to the host. + +- `hostname` + + is the hostname of the virtual host + +#### HOST_LABEL + +`HOST_LABEL` adds a key-value pair to the virtual host labels. It has to be given between `HOST_DEFINE` and `HOST_DEFINE_END`. + +The template is: + +> HOST_LABEL key value + +where: + +- `key` + + uniquely identifies the key of the label + +- `value` + + is the value associated with this key + +There are a few special keys that are used to define the system information of the monitored system: + +- `_cloud_provider_type` +- `_cloud_instance_type` +- `_cloud_instance_region` +- `_os_name` +- `_os_version` +- `_kernel_version` +- `_system_cores` +- `_system_cpu_freq` +- `_system_ram_total` +- `_system_disk_space` +- `_architecture` +- `_virtualization` +- `_container` +- `_container_detection` +- `_virt_detection` +- `_is_k8s_node` +- `_install_type` +- `_prebuilt_arch` +- `_prebuilt_dist` + +#### HOST_DEFINE_END + +`HOST_DEFINE_END` commits the host information, creating a new host entity, or updating an existing one with the same `machine_guid`. + +#### HOST + +`HOST` switches data collection between hosts. + +The template is: + +> HOST machine_guid + +where: + +- `machine_guid` + + is the UUID of the host to switch to. After this command, every other command following it is assumed to be associated with this host. + Setting machine_guid to `localhost` switches data collection to the local host. + #### CHART `CHART` defines a new chart. diff --git a/collectors/plugins.d/plugins_d.c b/collectors/plugins.d/plugins_d.c index 7608f3afcb..dc13cd2eec 100644 --- a/collectors/plugins.d/plugins_d.c +++ b/collectors/plugins.d/plugins_d.c @@ -18,7 +18,7 @@ inline size_t pluginsd_initialize_plugin_directories() } // Parse it and store it to plugin directories - return quoted_strings_splitter(plugins_dir_list, plugin_directories, PLUGINSD_MAX_DIRECTORIES, config_isspace, NULL, NULL, 0); + return quoted_strings_splitter(plugins_dir_list, plugin_directories, PLUGINSD_MAX_DIRECTORIES, config_isspace); } static inline void plugin_set_disabled(struct plugind *cd) { @@ -51,6 +51,8 @@ static void pluginsd_worker_thread_cleanup(void *arg) { struct plugind *cd = (struct plugind *)arg; + worker_unregister(); + netdata_spinlock_lock(&cd->unsafe.spinlock); cd->unsafe.running = false; @@ -62,74 +64,73 @@ static void pluginsd_worker_thread_cleanup(void *arg) netdata_spinlock_unlock(&cd->unsafe.spinlock); if (pid) { - info("data collection thread exiting"); - siginfo_t info; - info("killing child process pid %d", pid); + info("PLUGINSD: 'host:%s', killing data collection child process with pid %d", + rrdhost_hostname(cd->host), pid); + if (killpid(pid) != -1) { - info("waiting for child process pid %d to exit...", pid); + info("PLUGINSD: 'host:%s', waiting for data collection child process pid %d to exit...", + rrdhost_hostname(cd->host), pid); + waitid(P_PID, (id_t)pid, &info, WEXITED); } } } #define SERIAL_FAILURES_THRESHOLD 10 -static void pluginsd_worker_thread_handle_success(struct plugind *cd) -{ +static void pluginsd_worker_thread_handle_success(struct plugind *cd) { if (likely(cd->successful_collections)) { sleep((unsigned int)cd->update_every); return; } if (likely(cd->serial_failures <= SERIAL_FAILURES_THRESHOLD)) { - info( - "'%s' (pid %d) does not generate useful output but it reports success (exits with 0). %s.", - cd->fullfilename, cd->unsafe.pid, - plugin_is_enabled(cd) ? "Waiting a bit before starting it again." : "Will not start it again - it is now disabled."); + info("PLUGINSD: 'host:%s', '%s' (pid %d) does not generate useful output but it reports success (exits with 0). %s.", + rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, + plugin_is_enabled(cd) ? "Waiting a bit before starting it again." : "Will not start it again - it is now disabled."); + sleep((unsigned int)(cd->update_every * 10)); return; } if (cd->serial_failures > SERIAL_FAILURES_THRESHOLD) { - error( - "'%s' (pid %d) does not generate useful output, although it reports success (exits with 0)." - "We have tried to collect something %zu times - unsuccessfully. Disabling it.", - cd->fullfilename, cd->unsafe.pid, cd->serial_failures); + error("PLUGINSD: 'host:'%s', '%s' (pid %d) does not generate useful output, " + "although it reports success (exits with 0)." + "We have tried to collect something %zu times - unsuccessfully. Disabling it.", + rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, cd->serial_failures); plugin_set_disabled(cd); return; } } -static void pluginsd_worker_thread_handle_error(struct plugind *cd, int worker_ret_code) -{ +static void pluginsd_worker_thread_handle_error(struct plugind *cd, int worker_ret_code) { if (worker_ret_code == -1) { - info("'%s' (pid %d) was killed with SIGTERM. Disabling it.", cd->fullfilename, cd->unsafe.pid); + info("PLUGINSD: 'host:%s', '%s' (pid %d) was killed with SIGTERM. Disabling it.", + rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid); plugin_set_disabled(cd); return; } if (!cd->successful_collections) { - error( - "'%s' (pid %d) exited with error code %d and haven't collected any data. Disabling it.", cd->fullfilename, - cd->unsafe.pid, worker_ret_code); + error("PLUGINSD: 'host:%s', '%s' (pid %d) exited with error code %d and haven't collected any data. Disabling it.", + rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, worker_ret_code); plugin_set_disabled(cd); return; } if (cd->serial_failures <= SERIAL_FAILURES_THRESHOLD) { - error( - "'%s' (pid %d) exited with error code %d, but has given useful output in the past (%zu times). %s", - cd->fullfilename, cd->unsafe.pid, worker_ret_code, cd->successful_collections, - plugin_is_enabled(cd) ? "Waiting a bit before starting it again." : "Will not start it again - it is disabled."); + error("PLUGINSD: 'host:%s', '%s' (pid %d) exited with error code %d, but has given useful output in the past (%zu times). %s", + rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, worker_ret_code, cd->successful_collections, + plugin_is_enabled(cd) ? "Waiting a bit before starting it again." : "Will not start it again - it is disabled."); sleep((unsigned int)(cd->update_every * 10)); return; } if (cd->serial_failures > SERIAL_FAILURES_THRESHOLD) { - error( - "'%s' (pid %d) exited with error code %d, but has given useful output in the past (%zu times)." - "We tried to restart it %zu times, but it failed to generate data. Disabling it.", - cd->fullfilename, cd->unsafe.pid, worker_ret_code, cd->successful_collections, cd->serial_failures); + error("PLUGINSD: 'host:%s', '%s' (pid %d) exited with error code %d, but has given useful output in the past (%zu times)." + "We tried to restart it %zu times, but it failed to generate data. Disabling it.", + rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, worker_ret_code, + cd->successful_collections, cd->serial_failures); plugin_set_disabled(cd); return; } @@ -137,8 +138,7 @@ static void pluginsd_worker_thread_handle_error(struct plugind *cd, int worker_r #undef SERIAL_FAILURES_THRESHOLD -static void *pluginsd_worker_thread(void *arg) -{ +static void *pluginsd_worker_thread(void *arg) { worker_register("PLUGINSD"); netdata_thread_cleanup_push(pluginsd_worker_thread_cleanup, arg); @@ -151,14 +151,20 @@ static void *pluginsd_worker_thread(void *arg) while (service_running(SERVICE_COLLECTORS)) { FILE *fp_child_input = NULL; FILE *fp_child_output = netdata_popen(cd->cmd, &cd->unsafe.pid, &fp_child_input); + if (unlikely(!fp_child_input || !fp_child_output)) { - error("Cannot popen(\"%s\", \"r\").", cd->cmd); + error("PLUGINSD: 'host:%s', cannot popen(\"%s\", \"r\").", rrdhost_hostname(cd->host), cd->cmd); break; } - info("connected to '%s' running on pid %d", cd->fullfilename, cd->unsafe.pid); - count = pluginsd_process(localhost, cd, fp_child_input, fp_child_output, 0); - error("'%s' (pid %d) disconnected after %zu successful data collections (ENDs).", cd->fullfilename, cd->unsafe.pid, count); + info("PLUGINSD: 'host:%s' connected to '%s' running on pid %d", + rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid); + + count = pluginsd_process(cd->host, cd, fp_child_input, fp_child_output, 0); + + info("PLUGINSD: 'host:%s', '%s' (pid %d) disconnected after %zu successful data collections (ENDs).", + rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, count); + killpid(cd->unsafe.pid); int worker_ret_code = netdata_pclose(fp_child_input, fp_child_output, cd->unsafe.pid); @@ -172,29 +178,29 @@ static void *pluginsd_worker_thread(void *arg) if (unlikely(!plugin_is_enabled(cd))) break; } - worker_unregister(); netdata_thread_cleanup_pop(1); return NULL; } -static void pluginsd_main_cleanup(void *data) -{ +static void pluginsd_main_cleanup(void *data) { struct netdata_static_thread *static_thread = (struct netdata_static_thread *)data; static_thread->enabled = NETDATA_MAIN_THREAD_EXITING; - info("cleaning up..."); + info("PLUGINSD: cleaning up..."); struct plugind *cd; for (cd = pluginsd_root; cd; cd = cd->next) { netdata_spinlock_lock(&cd->unsafe.spinlock); if (cd->unsafe.enabled && cd->unsafe.running && cd->unsafe.thread != 0) { - info("stopping plugin thread: %s", cd->id); + info("PLUGINSD: 'host:%s', stopping plugin thread: %s", + rrdhost_hostname(cd->host), cd->id); + netdata_thread_cancel(cd->unsafe.thread); } netdata_spinlock_unlock(&cd->unsafe.spinlock); } - info("cleanup completed."); + info("PLUGINSD: cleanup completed."); static_thread->enabled = NETDATA_MAIN_THREAD_EXITED; worker_unregister(); @@ -282,6 +288,7 @@ void *pluginsd_main(void *ptr) strncpyz(cd->filename, file->d_name, FILENAME_MAX); snprintfz(cd->fullfilename, FILENAME_MAX, "%s/%s", directory_name, cd->filename); + cd->host = localhost; cd->unsafe.enabled = enabled; cd->unsafe.running = false; @@ -294,9 +301,7 @@ void *pluginsd_main(void *ptr) config_get(cd->id, "command options", def)); // link it - if (likely(pluginsd_root)) - cd->next = pluginsd_root; - pluginsd_root = cd; + DOUBLE_LINKED_LIST_PREPEND_ITEM_UNSAFE(pluginsd_root, cd, prev, next); if (plugin_is_enabled(cd)) { char tag[NETDATA_THREAD_TAG_MAX + 1]; diff --git a/collectors/plugins.d/plugins_d.h b/collectors/plugins.d/plugins_d.h index 5c7da73361..865488d379 100644 --- a/collectors/plugins.d/plugins_d.h +++ b/collectors/plugins.d/plugins_d.h @@ -38,6 +38,11 @@ #define PLUGINSD_KEYWORD_SET_V2 "SET2" #define PLUGINSD_KEYWORD_END_V2 "END2" +#define PLUGINSD_KEYWORD_HOST_DEFINE "HOST_DEFINE" +#define PLUGINSD_KEYWORD_HOST_DEFINE_END "HOST_DEFINE_END" +#define PLUGINSD_KEYWORD_HOST_LABEL "HOST_LABEL" +#define PLUGINSD_KEYWORD_HOST "HOST" + #define PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT 10 // seconds #define PLUGINSD_LINE_MAX_SSL_READ 512 @@ -60,6 +65,7 @@ struct plugind { size_t serial_failures; // the number of times the plugin started // without collecting values + RRDHOST *host; // the host the plugin collects data for int update_every; // the plugin default data collection frequency struct { @@ -71,7 +77,8 @@ struct plugind { } unsafe; time_t started_t; - uint32_t capabilities; // follows the same principles as streaming capabilities + + struct plugind *prev; struct plugind *next; }; diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c index 448e13de28..891968e85e 100644 --- a/collectors/plugins.d/pluginsd_parser.c +++ b/collectors/plugins.d/pluginsd_parser.c @@ -152,7 +152,7 @@ static inline RRDDIM *pluginsd_acquire_dimension(RRDHOST *host, RRDSET *st, cons if(likely(st->pluginsd.pos < st->pluginsd.used)) { rda = st->pluginsd.rda[st->pluginsd.pos]; RRDDIM *rd = rrddim_acquired_to_rrddim(rda); - if (likely(rd && strcmp(rrddim_id(rd), dimension) == 0)) { + if (likely(rd && string_strcmp(rd->id, dimension) == 0)) { st->pluginsd.pos++; return rd; } @@ -297,6 +297,146 @@ PARSER_RC pluginsd_end(char **words, size_t num_words, void *user) return PARSER_RC_OK; } +static void pluginsd_host_define_cleanup(void *user) { + PARSER_USER_OBJECT *u = user; + + string_freez(u->host_define.hostname); + dictionary_destroy(u->host_define.rrdlabels); + + u->host_define.hostname = NULL; + u->host_define.rrdlabels = NULL; + u->host_define.parsing_host = false; +} + +static inline bool pluginsd_validate_machine_guid(const char *guid, uuid_t *uuid, char *output) { + if(uuid_parse(guid, *uuid)) + return false; + + uuid_unparse_lower(*uuid, output); + + return true; +} + +static PARSER_RC pluginsd_host_define(char **words, size_t num_words, void *user) { + PARSER_USER_OBJECT *u = user; + + char *guid = get_word(words, num_words, 1); + char *hostname = get_word(words, num_words, 2); + + if(unlikely(!guid || !*guid || !hostname || !*hostname)) + return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST_DEFINE, "missing parameters"); + + if(unlikely(u->host_define.parsing_host)) + return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST_DEFINE, + "another host definition is already open - did you send " PLUGINSD_KEYWORD_HOST_DEFINE_END "?"); + + if(!pluginsd_validate_machine_guid(guid, &u->host_define.machine_guid, u->host_define.machine_guid_str)) + return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST_DEFINE, "cannot parse MACHINE_GUID - is it a valid UUID?"); + + u->host_define.hostname = string_strdupz(hostname); + u->host_define.rrdlabels = rrdlabels_create(); + u->host_define.parsing_host = true; + + return PARSER_RC_OK; +} + +static inline PARSER_RC pluginsd_host_dictionary(char **words, size_t num_words, void *user, DICTIONARY *dict, const char *keyword) { + PARSER_USER_OBJECT *u = user; + + char *name = get_word(words, num_words, 1); + char *value = get_word(words, num_words, 2); + + if(!name || !*name || !value) + return PLUGINSD_DISABLE_PLUGIN(user, keyword, "missing parameters"); + + if(!u->host_define.parsing_host || !dict) + return PLUGINSD_DISABLE_PLUGIN(user, keyword, "host is not defined, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this"); + + rrdlabels_add(dict, name, value, RRDLABEL_SRC_CONFIG); + + return PARSER_RC_OK; +} + +static PARSER_RC pluginsd_host_labels(char **words, size_t num_words, void *user) { + PARSER_USER_OBJECT *u = user; + return pluginsd_host_dictionary(words, num_words, user, u->host_define.rrdlabels, PLUGINSD_KEYWORD_HOST_LABEL); +} + +static PARSER_RC pluginsd_host_define_end(char **words __maybe_unused, size_t num_words __maybe_unused, void *user) { + PARSER_USER_OBJECT *u = user; + + if(!u->host_define.parsing_host) + return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST_DEFINE_END, "missing initialization, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this"); + + RRDHOST *host = rrdhost_find_or_create( + string2str(u->host_define.hostname), + string2str(u->host_define.hostname), + u->host_define.machine_guid_str, + "Netdata Virtual Host 1.0", + netdata_configured_timezone, + netdata_configured_abbrev_timezone, + netdata_configured_utc_offset, + NULL, + program_name, + program_version, + default_rrd_update_every, + default_rrd_history_entries, + default_rrd_memory_mode, + default_health_enabled, + default_rrdpush_enabled, + default_rrdpush_destination, + default_rrdpush_api_key, + default_rrdpush_send_charts_matching, + default_rrdpush_enable_replication, + default_rrdpush_seconds_to_replicate, + default_rrdpush_replication_step, + rrdhost_labels_to_system_info(u->host_define.rrdlabels), + false + ); + + if(host->rrdlabels) { + rrdlabels_migrate_to_these(host->rrdlabels, u->host_define.rrdlabels); + } + else { + host->rrdlabels = u->host_define.rrdlabels; + u->host_define.rrdlabels = NULL; + } + + pluginsd_host_define_cleanup(user); + + u->host = host; + pluginsd_set_chart_from_parent(user, NULL, PLUGINSD_KEYWORD_HOST_DEFINE_END); + + rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN); + rrdcontext_host_child_connected(host); + + return PARSER_RC_OK; +} + +static PARSER_RC pluginsd_host(char **words, size_t num_words, void *user) { + PARSER_USER_OBJECT *u = user; + + char *guid = get_word(words, num_words, 1); + + if(!guid || !*guid || strcmp(guid, "localhost") == 0) { + u->host = localhost; + return PARSER_RC_OK; + } + + uuid_t uuid; + char uuid_str[UUID_STR_LEN]; + if(!pluginsd_validate_machine_guid(guid, &uuid, uuid_str)) + return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST, "cannot parse MACHINE_GUID - is it a valid UUID?"); + + RRDHOST *host = rrdhost_find_by_guid(uuid_str); + if(unlikely(!host)) + return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST, "cannot find a host with this machine guid - have you created it?"); + + u->host = host; + + return PARSER_RC_OK; +} + PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user) { RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CHART); @@ -883,7 +1023,7 @@ PARSER_RC pluginsd_disable(char **words __maybe_unused, size_t num_words __maybe { info("PLUGINSD: plugin called DISABLE. Disabling it."); ((PARSER_USER_OBJECT *) user)->enabled = 0; - return PARSER_RC_ERROR; + return PARSER_RC_STOP; } PARSER_RC pluginsd_label(char **words, size_t num_words, void *user) @@ -1654,8 +1794,8 @@ PARSER_RC pluginsd_end_v2(char **words __maybe_unused, size_t num_words __maybe_ static void pluginsd_process_thread_cleanup(void *ptr) { PARSER *parser = (PARSER *)ptr; - if(parser->user_cleanup_cb) - parser->user_cleanup_cb(parser->user); + pluginsd_cleanup_v2(parser->user); + pluginsd_host_define_cleanup(parser->user); rrd_collector_finished(); parser_destroy(parser); @@ -1695,7 +1835,10 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi }; // fp_plugin_output = our input; fp_plugin_input = our output - PARSER *parser = parser_init(host, &user, NULL, fp_plugin_output, fp_plugin_input, -1, PARSER_INPUT_SPLIT, NULL); + PARSER *parser = parser_init(&user, fp_plugin_output, fp_plugin_input, -1, + PARSER_INPUT_SPLIT, NULL); + + pluginsd_keywords_init(parser, PARSER_INIT_PLUGINSD); rrd_collector_started(); @@ -1704,9 +1847,10 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser); user.parser = parser; + char buffer[PLUGINSD_LINE_MAX + 1]; - while (likely(!parser_next(parser))) { - if (unlikely(!service_running(SERVICE_COLLECTORS) || parser_action(parser, NULL))) + while (likely(!parser_next(parser, buffer, PLUGINSD_LINE_MAX))) { + if (unlikely(!service_running(SERVICE_COLLECTORS) || parser_action(parser, buffer))) break; } @@ -1725,3 +1869,133 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi return count; } + +static void pluginsd_keywords_init_internal(PARSER *parser, PLUGINSD_KEYWORDS types, void (*add_func)(PARSER *parser, char *keyword, keyword_function func)) { + + if (types & PARSER_INIT_PLUGINSD) { + add_func(parser, PLUGINSD_KEYWORD_FLUSH, pluginsd_flush); + add_func(parser, PLUGINSD_KEYWORD_DISABLE, pluginsd_disable); + + add_func(parser, PLUGINSD_KEYWORD_HOST_DEFINE, pluginsd_host_define); + add_func(parser, PLUGINSD_KEYWORD_HOST_DEFINE_END, pluginsd_host_define_end); + add_func(parser, PLUGINSD_KEYWORD_HOST_LABEL, pluginsd_host_labels); + add_func(parser, PLUGINSD_KEYWORD_HOST, pluginsd_host); + } + + if (types & (PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING)) { + // plugins.d plugins and streaming + add_func(parser, PLUGINSD_KEYWORD_CHART, pluginsd_chart); + add_func(parser, PLUGINSD_KEYWORD_DIMENSION, pluginsd_dimension); + add_func(parser, PLUGINSD_KEYWORD_VARIABLE, pluginsd_variable); + add_func(parser, PLUGINSD_KEYWORD_LABEL, pluginsd_label); + add_func(parser, PLUGINSD_KEYWORD_OVERWRITE, pluginsd_overwrite); + add_func(parser, PLUGINSD_KEYWORD_CLABEL_COMMIT, pluginsd_clabel_commit); + add_func(parser, PLUGINSD_KEYWORD_CLABEL, pluginsd_clabel); + add_func(parser, PLUGINSD_KEYWORD_FUNCTION, pluginsd_function); + add_func(parser, PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN, pluginsd_function_result_begin); + + add_func(parser, PLUGINSD_KEYWORD_BEGIN, pluginsd_begin); + add_func(parser, PLUGINSD_KEYWORD_SET, pluginsd_set); + add_func(parser, PLUGINSD_KEYWORD_END, pluginsd_end); + + inflight_functions_init(parser); + } + + if (types & PARSER_INIT_STREAMING) { + add_func(parser, PLUGINSD_KEYWORD_CHART_DEFINITION_END, pluginsd_chart_definition_end); + + // replication + add_func(parser, PLUGINSD_KEYWORD_REPLAY_BEGIN, pluginsd_replay_begin); + add_func(parser, PLUGINSD_KEYWORD_REPLAY_SET, pluginsd_replay_set); + add_func(parser, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE, pluginsd_replay_rrddim_collection_state); + add_func(parser, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE, pluginsd_replay_rrdset_collection_state); + add_func(parser, PLUGINSD_KEYWORD_REPLAY_END, pluginsd_replay_end); + + // streaming metrics v2 + add_func(parser, PLUGINSD_KEYWORD_BEGIN_V2, pluginsd_begin_v2); + add_func(parser, PLUGINSD_KEYWORD_SET_V2, pluginsd_set_v2); + add_func(parser, PLUGINSD_KEYWORD_END_V2, pluginsd_end_v2); + } +} + +void pluginsd_keywords_init(PARSER *parser, PLUGINSD_KEYWORDS types) { + pluginsd_keywords_init_internal(parser, types, parser_add_keyword); +} + +struct pluginsd_user_unittest { + size_t size; + const char **hashtable; + uint32_t (*hash)(const char *s); + size_t collisions; +}; + +void pluginsd_keyword_collision_check(PARSER *parser, char *keyword, keyword_function func __maybe_unused) { + struct pluginsd_user_unittest *u = parser->user; + + uint32_t hash = u->hash(keyword); + uint32_t slot = hash % u->size; + + if(u->hashtable[slot]) + u->collisions++; + + u->hashtable[slot] = keyword; +} + +static struct { + const char *name; + uint32_t (*hash)(const char *s); + size_t slots_needed; +} hashers[] = { + { .name = "djb2_hash32(s)", djb2_hash32, .slots_needed = 0, }, + { .name = "fnv1_hash32(s)", fnv1_hash32, .slots_needed = 0, }, + { .name = "fnv1a_hash32(s)", fnv1a_hash32, .slots_needed = 0, }, + { .name = "larson_hash32(s)", larson_hash32, .slots_needed = 0, }, + { .name = "pluginsd_parser_hash32(s)", pluginsd_parser_hash32, .slots_needed = 0, }, + + // terminator + { .name = NULL, NULL, .slots_needed = 0, }, +}; + +int pluginsd_parser_unittest(void) { + PARSER *p; + size_t slots_to_check = 1000; + size_t i, h; + + // check for hashtable collisions + for(h = 0; hashers[h].name ;h++) { + hashers[h].slots_needed = slots_to_check * 1000000; + + for (i = 10; i < slots_to_check; i++) { + struct pluginsd_user_unittest user = { + .hash = hashers[h].hash, + .size = i, + .hashtable = callocz(i, sizeof(const char *)), + .collisions = 0, + }; + + p = parser_init(&user, NULL, NULL, -1, PARSER_INPUT_SPLIT, NULL); + pluginsd_keywords_init_internal(p, PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING, + pluginsd_keyword_collision_check); + parser_destroy(p); + + freez(user.hashtable); + + if (!user.collisions) { + hashers[h].slots_needed = i; + break; + } + } + } + + for(h = 0; hashers[h].name ;h++) { + if(hashers[h].slots_needed > 1000) + info("PARSER: hash function '%s' cannot be used without collisions under %zu slots", hashers[h].name, slots_to_check); + else + info("PARSER: hash function '%s' needs PARSER_KEYWORDS_HASHTABLE_SIZE (in parser.h) set to %zu", hashers[h].name, hashers[h].slots_needed); + } + + p = parser_init(NULL, NULL, NULL, -1, PARSER_INPUT_SPLIT, NULL); + pluginsd_keywords_init(p, PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING); + parser_destroy(p); + return 0; +} diff --git a/collectors/plugins.d/pluginsd_parser.h b/collectors/plugins.d/pluginsd_parser.h index dc43630ed5..57829ca349 100644 --- a/collectors/plugins.d/pluginsd_parser.h +++ b/collectors/plugins.d/pluginsd_parser.h @@ -3,7 +3,12 @@ #ifndef NETDATA_PLUGINSD_PARSER_H #define NETDATA_PLUGINSD_PARSER_H -#include "parser/parser.h" +#include "daemon/common.h" + +typedef enum __attribute__ ((__packed__)) { + PARSER_INIT_PLUGINSD = (1 << 1), + PARSER_INIT_STREAMING = (1 << 2), +} PLUGINSD_KEYWORDS; typedef struct parser_user_object { PARSER *parser; @@ -17,6 +22,14 @@ typedef struct parser_user_object { size_t data_collections_count; int enabled; + struct { + bool parsing_host; + uuid_t machine_guid; + char machine_guid_str[UUID_STR_LEN]; + STRING *hostname; + DICTIONARY *rrdlabels; + } host_define; + struct parser_user_object_replay { time_t start_time; time_t end_time; @@ -42,4 +55,6 @@ typedef struct parser_user_object { PARSER_RC pluginsd_function(char **words, size_t num_words, void *user); PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, void *user); void inflight_functions_init(PARSER *parser); +void pluginsd_keywords_init(PARSER *parser, PLUGINSD_KEYWORDS types); + #endif //NETDATA_PLUGINSD_PARSER_H diff --git a/collectors/statsd.plugin/statsd.c b/collectors/statsd.plugin/statsd.c index d15129b9c7..343ec42520 100644 --- a/collectors/statsd.plugin/statsd.c +++ b/collectors/statsd.plugin/statsd.c @@ -1480,7 +1480,7 @@ static int statsd_readfile(const char *filename, STATSD_APP *app, STATSD_APP_CHA else if (!strcmp(name, "dimension")) { // metric [name [type [multiplier [divisor]]]] char *words[10] = { NULL }; - size_t num_words = pluginsd_split_words(value, words, 10, NULL, NULL, 0); + size_t num_words = pluginsd_split_words(value, words, 10); int pattern = 0; size_t i = 0; |