summaryrefslogtreecommitdiffstats
path: root/collectors/plugins.d
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-02-09 20:27:05 +0200
committerGitHub <noreply@github.com>2023-02-09 20:27:05 +0200
commit414f416c5d290db3c3eed9073258c834fac7f2f7 (patch)
treeaeed0a2619a1f42c60ec816311d1dd10c8e5ccc1 /collectors/plugins.d
parent8043106b60ec3d8d32b3a9ff3ef53991e73d9037 (diff)
Virtual hosts for data collection (#14464)
* support multiple hosts at pluginsd structures * cleanup obsolete code * use a lookup hashtable to quickly find the keyword to execute, without traversing the whole linked list of keywords * more cleanup * move new hash function to inlined.h * minimize comparisons, eliminate a pre-parsing of the first keyword for each line * cleanup parser from old code * move parser into libnetdata * unique entries in parser keywords hashtable * move all hashing functions to inlined.h, name their sources, simple_hash() now defaults to FNV1a, it was FNV1 * small_hash() for parser * plugins.d now can switch hosts, and also create/update them * update hash function and hashtable size * updated message * unittest all hashing functions * reset the chart when setting a new host * remove host tags * enable archived hosts when a collector pushes host info * do not need localhost to swtich to localhost * disable ARAL and OWA with -DFSANITIZE_ADDRESS=1
Diffstat (limited to 'collectors/plugins.d')
-rw-r--r--collectors/plugins.d/README.md77
-rw-r--r--collectors/plugins.d/plugins_d.c93
-rw-r--r--collectors/plugins.d/plugins_d.h9
-rw-r--r--collectors/plugins.d/pluginsd_parser.c288
-rw-r--r--collectors/plugins.d/pluginsd_parser.h17
5 files changed, 431 insertions, 53 deletions
diff --git a/collectors/plugins.d/README.md b/collectors/plugins.d/README.md
index 8ad1d3a65d..375099b128 100644
--- a/collectors/plugins.d/README.md
+++ b/collectors/plugins.d/README.md
@@ -175,6 +175,83 @@ The plugin should output instructions for Netdata to its output (`stdout`). Sinc
`DISABLE` will disable this plugin. This will prevent Netdata from restarting the plugin. You can also exit with the value `1` to have the same effect.
+#### HOST_DEFINE
+
+`HOST_DEFINE` defines a new (or updates an existing) virtual host.
+
+The template is:
+
+> HOST_DEFINE machine_guid hostname
+
+where:
+
+- `machine_guid`
+
+ uniquely identifies the host, this is what will be needed to add charts to the host.
+
+- `hostname`
+
+ is the hostname of the virtual host
+
+#### HOST_LABEL
+
+`HOST_LABEL` adds a key-value pair to the virtual host labels. It has to be given between `HOST_DEFINE` and `HOST_DEFINE_END`.
+
+The template is:
+
+> HOST_LABEL key value
+
+where:
+
+- `key`
+
+ uniquely identifies the key of the label
+
+- `value`
+
+ is the value associated with this key
+
+There are a few special keys that are used to define the system information of the monitored system:
+
+- `_cloud_provider_type`
+- `_cloud_instance_type`
+- `_cloud_instance_region`
+- `_os_name`
+- `_os_version`
+- `_kernel_version`
+- `_system_cores`
+- `_system_cpu_freq`
+- `_system_ram_total`
+- `_system_disk_space`
+- `_architecture`
+- `_virtualization`
+- `_container`
+- `_container_detection`
+- `_virt_detection`
+- `_is_k8s_node`
+- `_install_type`
+- `_prebuilt_arch`
+- `_prebuilt_dist`
+
+#### HOST_DEFINE_END
+
+`HOST_DEFINE_END` commits the host information, creating a new host entity, or updating an existing one with the same `machine_guid`.
+
+#### HOST
+
+`HOST` switches data collection between hosts.
+
+The template is:
+
+> HOST machine_guid
+
+where:
+
+- `machine_guid`
+
+ is the UUID of the host to switch to. After this command, every other command following it is assumed to be associated with this host.
+ Setting machine_guid to `localhost` switches data collection to the local host.
+
#### CHART
`CHART` defines a new chart.
diff --git a/collectors/plugins.d/plugins_d.c b/collectors/plugins.d/plugins_d.c
index 7608f3afcb..dc13cd2eec 100644
--- a/collectors/plugins.d/plugins_d.c
+++ b/collectors/plugins.d/plugins_d.c
@@ -18,7 +18,7 @@ inline size_t pluginsd_initialize_plugin_directories()
}
// Parse it and store it to plugin directories
- return quoted_strings_splitter(plugins_dir_list, plugin_directories, PLUGINSD_MAX_DIRECTORIES, config_isspace, NULL, NULL, 0);
+ return quoted_strings_splitter(plugins_dir_list, plugin_directories, PLUGINSD_MAX_DIRECTORIES, config_isspace);
}
static inline void plugin_set_disabled(struct plugind *cd) {
@@ -51,6 +51,8 @@ static void pluginsd_worker_thread_cleanup(void *arg)
{
struct plugind *cd = (struct plugind *)arg;
+ worker_unregister();
+
netdata_spinlock_lock(&cd->unsafe.spinlock);
cd->unsafe.running = false;
@@ -62,74 +64,73 @@ static void pluginsd_worker_thread_cleanup(void *arg)
netdata_spinlock_unlock(&cd->unsafe.spinlock);
if (pid) {
- info("data collection thread exiting");
-
siginfo_t info;
- info("killing child process pid %d", pid);
+ info("PLUGINSD: 'host:%s', killing data collection child process with pid %d",
+ rrdhost_hostname(cd->host), pid);
+
if (killpid(pid) != -1) {
- info("waiting for child process pid %d to exit...", pid);
+ info("PLUGINSD: 'host:%s', waiting for data collection child process pid %d to exit...",
+ rrdhost_hostname(cd->host), pid);
+
waitid(P_PID, (id_t)pid, &info, WEXITED);
}
}
}
#define SERIAL_FAILURES_THRESHOLD 10
-static void pluginsd_worker_thread_handle_success(struct plugind *cd)
-{
+static void pluginsd_worker_thread_handle_success(struct plugind *cd) {
if (likely(cd->successful_collections)) {
sleep((unsigned int)cd->update_every);
return;
}
if (likely(cd->serial_failures <= SERIAL_FAILURES_THRESHOLD)) {
- info(
- "'%s' (pid %d) does not generate useful output but it reports success (exits with 0). %s.",
- cd->fullfilename, cd->unsafe.pid,
- plugin_is_enabled(cd) ? "Waiting a bit before starting it again." : "Will not start it again - it is now disabled.");
+ info("PLUGINSD: 'host:%s', '%s' (pid %d) does not generate useful output but it reports success (exits with 0). %s.",
+ rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid,
+ plugin_is_enabled(cd) ? "Waiting a bit before starting it again." : "Will not start it again - it is now disabled.");
+
sleep((unsigned int)(cd->update_every * 10));
return;
}
if (cd->serial_failures > SERIAL_FAILURES_THRESHOLD) {
- error(
- "'%s' (pid %d) does not generate useful output, although it reports success (exits with 0)."
- "We have tried to collect something %zu times - unsuccessfully. Disabling it.",
- cd->fullfilename, cd->unsafe.pid, cd->serial_failures);
+ error("PLUGINSD: 'host:'%s', '%s' (pid %d) does not generate useful output, "
+ "although it reports success (exits with 0)."
+ "We have tried to collect something %zu times - unsuccessfully. Disabling it.",
+ rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, cd->serial_failures);
plugin_set_disabled(cd);
return;
}
}
-static void pluginsd_worker_thread_handle_error(struct plugind *cd, int worker_ret_code)
-{
+static void pluginsd_worker_thread_handle_error(struct plugind *cd, int worker_ret_code) {
if (worker_ret_code == -1) {
- info("'%s' (pid %d) was killed with SIGTERM. Disabling it.", cd->fullfilename, cd->unsafe.pid);
+ info("PLUGINSD: 'host:%s', '%s' (pid %d) was killed with SIGTERM. Disabling it.",
+ rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid);
plugin_set_disabled(cd);
return;
}
if (!cd->successful_collections) {
- error(
- "'%s' (pid %d) exited with error code %d and haven't collected any data. Disabling it.", cd->fullfilename,
- cd->unsafe.pid, worker_ret_code);
+ error("PLUGINSD: 'host:%s', '%s' (pid %d) exited with error code %d and haven't collected any data. Disabling it.",
+ rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, worker_ret_code);
plugin_set_disabled(cd);
return;
}
if (cd->serial_failures <= SERIAL_FAILURES_THRESHOLD) {
- error(
- "'%s' (pid %d) exited with error code %d, but has given useful output in the past (%zu times). %s",
- cd->fullfilename, cd->unsafe.pid, worker_ret_code, cd->successful_collections,
- plugin_is_enabled(cd) ? "Waiting a bit before starting it again." : "Will not start it again - it is disabled.");
+ error("PLUGINSD: 'host:%s', '%s' (pid %d) exited with error code %d, but has given useful output in the past (%zu times). %s",
+ rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, worker_ret_code, cd->successful_collections,
+ plugin_is_enabled(cd) ? "Waiting a bit before starting it again." : "Will not start it again - it is disabled.");
sleep((unsigned int)(cd->update_every * 10));
return;
}
if (cd->serial_failures > SERIAL_FAILURES_THRESHOLD) {
- error(
- "'%s' (pid %d) exited with error code %d, but has given useful output in the past (%zu times)."
- "We tried to restart it %zu times, but it failed to generate data. Disabling it.",
- cd->fullfilename, cd->unsafe.pid, worker_ret_code, cd->successful_collections, cd->serial_failures);
+ error("PLUGINSD: 'host:%s', '%s' (pid %d) exited with error code %d, but has given useful output in the past (%zu times)."
+ "We tried to restart it %zu times, but it failed to generate data. Disabling it.",
+ rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, worker_ret_code,
+ cd->successful_collections, cd->serial_failures);
plugin_set_disabled(cd);
return;
}
@@ -137,8 +138,7 @@ static void pluginsd_worker_thread_handle_error(struct plugind *cd, int worker_r
#undef SERIAL_FAILURES_THRESHOLD
-static void *pluginsd_worker_thread(void *arg)
-{
+static void *pluginsd_worker_thread(void *arg) {
worker_register("PLUGINSD");
netdata_thread_cleanup_push(pluginsd_worker_thread_cleanup, arg);
@@ -151,14 +151,20 @@ static void *pluginsd_worker_thread(void *arg)
while (service_running(SERVICE_COLLECTORS)) {
FILE *fp_child_input = NULL;
FILE *fp_child_output = netdata_popen(cd->cmd, &cd->unsafe.pid, &fp_child_input);
+
if (unlikely(!fp_child_input || !fp_child_output)) {
- error("Cannot popen(\"%s\", \"r\").", cd->cmd);
+ error("PLUGINSD: 'host:%s', cannot popen(\"%s\", \"r\").", rrdhost_hostname(cd->host), cd->cmd);
break;
}
- info("connected to '%s' running on pid %d", cd->fullfilename, cd->unsafe.pid);
- count = pluginsd_process(localhost, cd, fp_child_input, fp_child_output, 0);
- error("'%s' (pid %d) disconnected after %zu successful data collections (ENDs).", cd->fullfilename, cd->unsafe.pid, count);
+ info("PLUGINSD: 'host:%s' connected to '%s' running on pid %d",
+ rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid);
+
+ count = pluginsd_process(cd->host, cd, fp_child_input, fp_child_output, 0);
+
+ info("PLUGINSD: 'host:%s', '%s' (pid %d) disconnected after %zu successful data collections (ENDs).",
+ rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, count);
+
killpid(cd->unsafe.pid);
int worker_ret_code = netdata_pclose(fp_child_input, fp_child_output, cd->unsafe.pid);
@@ -172,29 +178,29 @@ static void *pluginsd_worker_thread(void *arg)
if (unlikely(!plugin_is_enabled(cd)))
break;
}
- worker_unregister();
netdata_thread_cleanup_pop(1);
return NULL;
}
-static void pluginsd_main_cleanup(void *data)
-{
+static void pluginsd_main_cleanup(void *data) {
struct netdata_static_thread *static_thread = (struct netdata_static_thread *)data;
static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
- info("cleaning up...");
+ info("PLUGINSD: cleaning up...");
struct plugind *cd;
for (cd = pluginsd_root; cd; cd = cd->next) {
netdata_spinlock_lock(&cd->unsafe.spinlock);
if (cd->unsafe.enabled && cd->unsafe.running && cd->unsafe.thread != 0) {
- info("stopping plugin thread: %s", cd->id);
+ info("PLUGINSD: 'host:%s', stopping plugin thread: %s",
+ rrdhost_hostname(cd->host), cd->id);
+
netdata_thread_cancel(cd->unsafe.thread);
}
netdata_spinlock_unlock(&cd->unsafe.spinlock);
}
- info("cleanup completed.");
+ info("PLUGINSD: cleanup completed.");
static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
worker_unregister();
@@ -282,6 +288,7 @@ void *pluginsd_main(void *ptr)
strncpyz(cd->filename, file->d_name, FILENAME_MAX);
snprintfz(cd->fullfilename, FILENAME_MAX, "%s/%s", directory_name, cd->filename);
+ cd->host = localhost;
cd->unsafe.enabled = enabled;
cd->unsafe.running = false;
@@ -294,9 +301,7 @@ void *pluginsd_main(void *ptr)
config_get(cd->id, "command options", def));
// link it
- if (likely(pluginsd_root))
- cd->next = pluginsd_root;
- pluginsd_root = cd;
+ DOUBLE_LINKED_LIST_PREPEND_ITEM_UNSAFE(pluginsd_root, cd, prev, next);
if (plugin_is_enabled(cd)) {
char tag[NETDATA_THREAD_TAG_MAX + 1];
diff --git a/collectors/plugins.d/plugins_d.h b/collectors/plugins.d/plugins_d.h
index 5c7da73361..865488d379 100644
--- a/collectors/plugins.d/plugins_d.h
+++ b/collectors/plugins.d/plugins_d.h
@@ -38,6 +38,11 @@
#define PLUGINSD_KEYWORD_SET_V2 "SET2"
#define PLUGINSD_KEYWORD_END_V2 "END2"
+#define PLUGINSD_KEYWORD_HOST_DEFINE "HOST_DEFINE"
+#define PLUGINSD_KEYWORD_HOST_DEFINE_END "HOST_DEFINE_END"
+#define PLUGINSD_KEYWORD_HOST_LABEL "HOST_LABEL"
+#define PLUGINSD_KEYWORD_HOST "HOST"
+
#define PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT 10 // seconds
#define PLUGINSD_LINE_MAX_SSL_READ 512
@@ -60,6 +65,7 @@ struct plugind {
size_t serial_failures; // the number of times the plugin started
// without collecting values
+ RRDHOST *host; // the host the plugin collects data for
int update_every; // the plugin default data collection frequency
struct {
@@ -71,7 +77,8 @@ struct plugind {
} unsafe;
time_t started_t;
- uint32_t capabilities; // follows the same principles as streaming capabilities
+
+ struct plugind *prev;
struct plugind *next;
};
diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c
index 448e13de28..891968e85e 100644
--- a/collectors/plugins.d/pluginsd_parser.c
+++ b/collectors/plugins.d/pluginsd_parser.c
@@ -152,7 +152,7 @@ static inline RRDDIM *pluginsd_acquire_dimension(RRDHOST *host, RRDSET *st, cons
if(likely(st->pluginsd.pos < st->pluginsd.used)) {
rda = st->pluginsd.rda[st->pluginsd.pos];
RRDDIM *rd = rrddim_acquired_to_rrddim(rda);
- if (likely(rd && strcmp(rrddim_id(rd), dimension) == 0)) {
+ if (likely(rd && string_strcmp(rd->id, dimension) == 0)) {
st->pluginsd.pos++;
return rd;
}
@@ -297,6 +297,146 @@ PARSER_RC pluginsd_end(char **words, size_t num_words, void *user)
return PARSER_RC_OK;
}
+static void pluginsd_host_define_cleanup(void *user) {
+ PARSER_USER_OBJECT *u = user;
+
+ string_freez(u->host_define.hostname);
+ dictionary_destroy(u->host_define.rrdlabels);
+
+ u->host_define.hostname = NULL;
+ u->host_define.rrdlabels = NULL;
+ u->host_define.parsing_host = false;
+}
+
+static inline bool pluginsd_validate_machine_guid(const char *guid, uuid_t *uuid, char *output) {
+ if(uuid_parse(guid, *uuid))
+ return false;
+
+ uuid_unparse_lower(*uuid, output);
+
+ return true;
+}
+
+static PARSER_RC pluginsd_host_define(char **words, size_t num_words, void *user) {
+ PARSER_USER_OBJECT *u = user;
+
+ char *guid = get_word(words, num_words, 1);
+ char *hostname = get_word(words, num_words, 2);
+
+ if(unlikely(!guid || !*guid || !hostname || !*hostname))
+ return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST_DEFINE, "missing parameters");
+
+ if(unlikely(u->host_define.parsing_host))
+ return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST_DEFINE,
+ "another host definition is already open - did you send " PLUGINSD_KEYWORD_HOST_DEFINE_END "?");
+
+ if(!pluginsd_validate_machine_guid(guid, &u->host_define.machine_guid, u->host_define.machine_guid_str))
+ return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST_DEFINE, "cannot parse MACHINE_GUID - is it a valid UUID?");
+
+ u->host_define.hostname = string_strdupz(hostname);
+ u->host_define.rrdlabels = rrdlabels_create();
+ u->host_define.parsing_host = true;
+
+ return PARSER_RC_OK;
+}
+
+static inline PARSER_RC pluginsd_host_dictionary(char **words, size_t num_words, void *user, DICTIONARY *dict, const char *keyword) {
+ PARSER_USER_OBJECT *u = user;
+
+ char *name = get_word(words, num_words, 1);
+ char *value = get_word(words, num_words, 2);
+
+ if(!name || !*name || !value)
+ return PLUGINSD_DISABLE_PLUGIN(user, keyword, "missing parameters");
+
+ if(!u->host_define.parsing_host || !dict)
+ return PLUGINSD_DISABLE_PLUGIN(user, keyword, "host is not defined, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this");
+
+ rrdlabels_add(dict, name, value, RRDLABEL_SRC_CONFIG);
+
+ return PARSER_RC_OK;
+}
+
+static PARSER_RC pluginsd_host_labels(char **words, size_t num_words, void *user) {
+ PARSER_USER_OBJECT *u = user;
+ return pluginsd_host_dictionary(words, num_words, user, u->host_define.rrdlabels, PLUGINSD_KEYWORD_HOST_LABEL);
+}
+
+static PARSER_RC pluginsd_host_define_end(char **words __maybe_unused, size_t num_words __maybe_unused, void *user) {
+ PARSER_USER_OBJECT *u = user;
+
+ if(!u->host_define.parsing_host)
+ return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST_DEFINE_END, "missing initialization, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this");
+
+ RRDHOST *host = rrdhost_find_or_create(
+ string2str(u->host_define.hostname),
+ string2str(u->host_define.hostname),
+ u->host_define.machine_guid_str,
+ "Netdata Virtual Host 1.0",
+ netdata_configured_timezone,
+ netdata_configured_abbrev_timezone,
+ netdata_configured_utc_offset,
+ NULL,
+ program_name,
+ program_version,
+ default_rrd_update_every,
+ default_rrd_history_entries,
+ default_rrd_memory_mode,
+ default_health_enabled,
+ default_rrdpush_enabled,
+ default_rrdpush_destination,
+ default_rrdpush_api_key,
+ default_rrdpush_send_charts_matching,
+ default_rrdpush_enable_replication,
+ default_rrdpush_seconds_to_replicate,
+ default_rrdpush_replication_step,
+ rrdhost_labels_to_system_info(u->host_define.rrdlabels),
+ false
+ );
+
+ if(host->rrdlabels) {
+ rrdlabels_migrate_to_these(host->rrdlabels, u->host_define.rrdlabels);
+ }
+ else {
+ host->rrdlabels = u->host_define.rrdlabels;
+ u->host_define.rrdlabels = NULL;
+ }
+
+ pluginsd_host_define_cleanup(user);
+
+ u->host = host;
+ pluginsd_set_chart_from_parent(user, NULL, PLUGINSD_KEYWORD_HOST_DEFINE_END);
+
+ rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN);
+ rrdcontext_host_child_connected(host);
+
+ return PARSER_RC_OK;
+}
+
+static PARSER_RC pluginsd_host(char **words, size_t num_words, void *user) {
+ PARSER_USER_OBJECT *u = user;
+
+ char *guid = get_word(words, num_words, 1);
+
+ if(!guid || !*guid || strcmp(guid, "localhost") == 0) {
+ u->host = localhost;
+ return PARSER_RC_OK;
+ }
+
+ uuid_t uuid;
+ char uuid_str[UUID_STR_LEN];
+ if(!pluginsd_validate_machine_guid(guid, &uuid, uuid_str))
+ return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST, "cannot parse MACHINE_GUID - is it a valid UUID?");
+
+ RRDHOST *host = rrdhost_find_by_guid(uuid_str);
+ if(unlikely(!host))
+ return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST, "cannot find a host with this machine guid - have you created it?");
+
+ u->host = host;
+
+ return PARSER_RC_OK;
+}
+
PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user)
{
RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CHART);
@@ -883,7 +1023,7 @@ PARSER_RC pluginsd_disable(char **words __maybe_unused, size_t num_words __maybe
{
info("PLUGINSD: plugin called DISABLE. Disabling it.");
((PARSER_USER_OBJECT *) user)->enabled = 0;
- return PARSER_RC_ERROR;
+ return PARSER_RC_STOP;
}
PARSER_RC pluginsd_label(char **words, size_t num_words, void *user)
@@ -1654,8 +1794,8 @@ PARSER_RC pluginsd_end_v2(char **words __maybe_unused, size_t num_words __maybe_
static void pluginsd_process_thread_cleanup(void *ptr) {
PARSER *parser = (PARSER *)ptr;
- if(parser->user_cleanup_cb)
- parser->user_cleanup_cb(parser->user);
+ pluginsd_cleanup_v2(parser->user);
+ pluginsd_host_define_cleanup(parser->user);
rrd_collector_finished();
parser_destroy(parser);
@@ -1695,7 +1835,10 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi
};
// fp_plugin_output = our input; fp_plugin_input = our output
- PARSER *parser = parser_init(host, &user, NULL, fp_plugin_output, fp_plugin_input, -1, PARSER_INPUT_SPLIT, NULL);
+ PARSER *parser = parser_init(&user, fp_plugin_output, fp_plugin_input, -1,
+ PARSER_INPUT_SPLIT, NULL);
+
+ pluginsd_keywords_init(parser, PARSER_INIT_PLUGINSD);
rrd_collector_started();
@@ -1704,9 +1847,10 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi
netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser);
user.parser = parser;
+ char buffer[PLUGINSD_LINE_MAX + 1];
- while (likely(!parser_next(parser))) {
- if (unlikely(!service_running(SERVICE_COLLECTORS) || parser_action(parser, NULL)))
+ while (likely(!parser_next(parser, buffer, PLUGINSD_LINE_MAX))) {
+ if (unlikely(!service_running(SERVICE_COLLECTORS) || parser_action(parser, buffer)))
break;
}
@@ -1725,3 +1869,133 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi
return count;
}
+
+static void pluginsd_keywords_init_internal(PARSER *parser, PLUGINSD_KEYWORDS types, void (*add_func)(PARSER *parser, char *keyword, keyword_function func)) {
+
+ if (types & PARSER_INIT_PLUGINSD) {
+ add_func(parser, PLUGINSD_KEYWORD_FLUSH, pluginsd_flush);
+ add_func(parser, PLUGINSD_KEYWORD_DISABLE, pluginsd_disable);
+
+ add_func(parser, PLUGINSD_KEYWORD_HOST_DEFINE, pluginsd_host_define);
+ add_func(parser, PLUGINSD_KEYWORD_HOST_DEFINE_END, pluginsd_host_define_end);
+ add_func(parser, PLUGINSD_KEYWORD_HOST_LABEL, pluginsd_host_labels);
+ add_func(parser, PLUGINSD_KEYWORD_HOST, pluginsd_host);
+ }
+
+ if (types & (PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING)) {
+ // plugins.d plugins and streaming
+ add_func(parser, PLUGINSD_KEYWORD_CHART, pluginsd_chart);
+ add_func(parser, PLUGINSD_KEYWORD_DIMENSION, pluginsd_dimension);
+ add_func(parser, PLUGINSD_KEYWORD_VARIABLE, pluginsd_variable);
+ add_func(parser, PLUGINSD_KEYWORD_LABEL, pluginsd_label);
+ add_func(parser, PLUGINSD_KEYWORD_OVERWRITE, pluginsd_overwrite);
+ add_func(parser, PLUGINSD_KEYWORD_CLABEL_COMMIT, pluginsd_clabel_commit);
+ add_func(parser, PLUGINSD_KEYWORD_CLABEL, pluginsd_clabel);
+ add_func(parser, PLUGINSD_KEYWORD_FUNCTION, pluginsd_function);
+ add_func(parser, PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN, pluginsd_function_result_begin);
+
+ add_func(parser, PLUGINSD_KEYWORD_BEGIN, pluginsd_begin);
+ add_func(parser, PLUGINSD_KEYWORD_SET, pluginsd_set);
+ add_func(parser, PLUGINSD_KEYWORD_END, pluginsd_end);
+
+ inflight_functions_init(parser);
+ }
+
+ if (types & PARSER_INIT_STREAMING) {
+ add_func(parser, PLUGINSD_KEYWORD_CHART_DEFINITION_END, pluginsd_chart_definition_end);
+
+ // replication
+ add_func(parser, PLUGINSD_KEYWORD_REPLAY_BEGIN, pluginsd_replay_begin);
+ add_func(parser, PLUGINSD_KEYWORD_REPLAY_SET, pluginsd_replay_set);
+ add_func(parser, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE, pluginsd_replay_rrddim_collection_state);
+ add_func(parser, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE, pluginsd_replay_rrdset_collection_state);
+ add_func(parser, PLUGINSD_KEYWORD_REPLAY_END, pluginsd_replay_end);
+
+ // streaming metrics v2
+ add_func(parser, PLUGINSD_KEYWORD_BEGIN_V2, pluginsd_begin_v2);
+ add_func(parser, PLUGINSD_KEYWORD_SET_V2, pluginsd_set_v2);
+ add_func(parser, PLUGINSD_KEYWORD_END_V2, pluginsd_end_v2);
+ }
+}
+
+void pluginsd_keywords_init(PARSER *parser, PLUGINSD_KEYWORDS types) {
+ pluginsd_keywords_init_internal(parser, types, parser_add_keyword);
+}
+
+struct pluginsd_user_unittest {
+ size_t size;
+ const char **hashtable;
+ uint32_t (*hash)(const char *s);
+ size_t collisions;
+};
+
+void pluginsd_keyword_collision_check(PARSER *parser, char *keyword, keyword_function func __maybe_unused) {
+ struct pluginsd_user_unittest *u = parser->user;
+
+ uint32_t hash = u->hash(keyword);
+ uint32_t slot = hash % u->size;
+
+ if(u->hashtable[slot])
+ u->collisions++;
+
+ u->hashtable[slot] = keyword;
+}
+
+static struct {
+ const char *name;
+ uint32_t (*hash)(const char *s);
+ size_t slots_needed;
+} hashers[] = {
+ { .name = "djb2_hash32(s)", djb2_hash32, .slots_needed = 0, },
+ { .name = "fnv1_hash32(s)", fnv1_hash32, .slots_needed = 0, },
+ { .name = "fnv1a_hash32(s)", fnv1a_hash32, .slots_needed = 0, },
+ { .name = "larson_hash32(s)", larson_hash32, .slots_needed = 0, },
+ { .name = "pluginsd_parser_hash32(s)", pluginsd_parser_hash32, .slots_needed = 0, },
+
+ // terminator
+ { .name = NULL, NULL, .slots_needed = 0, },
+};
+
+int pluginsd_parser_unittest(void) {
+ PARSER *p;
+ size_t slots_to_check = 1000;
+ size_t i, h;
+
+ // check for hashtable collisions
+ for(h = 0; hashers[h].name ;h++) {
+ hashers[h].slots_needed = slots_to_check * 1000000;
+
+ for (i = 10; i < slots_to_check; i++) {
+ struct pluginsd_user_unittest user = {
+ .hash = hashers[h].hash,
+ .size = i,
+ .hashtable = callocz(i, sizeof(const char *)),
+ .collisions = 0,
+ };
+
+ p = parser_init(&user, NULL, NULL, -1, PARSER_INPUT_SPLIT, NULL);
+ pluginsd_keywords_init_internal(p, PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING,
+ pluginsd_keyword_collision_check);
+ parser_destroy(p);
+
+ freez(user.hashtable);
+
+ if (!user.collisions) {
+ hashers[h].slots_needed = i;
+ break;
+ }
+ }
+ }
+
+ for(h = 0; hashers[h].name ;h++) {
+ if(hashers[h].slots_needed > 1000)
+ info("PARSER: hash function '%s' cannot be used without collisions under %zu slots", hashers[h].name, slots_to_check);
+ else
+ info("PARSER: hash function '%s' needs PARSER_KEYWORDS_HASHTABLE_SIZE (in parser.h) set to %zu", hashers[h].name, hashers[h].slots_needed);
+ }
+
+ p = parser_init(NULL, NULL, NULL, -1, PARSER_INPUT_SPLIT, NULL);
+ pluginsd_keywords_init(p, PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING);
+ parser_destroy(p);
+ return 0;
+}
diff --git a/collectors/plugins.d/pluginsd_parser.h b/collectors/plugins.d/pluginsd_parser.h
index dc43630ed5..57829ca349 100644
--- a/collectors/plugins.d/pluginsd_parser.h
+++ b/collectors/plugins.d/pluginsd_parser.h
@@ -3,7 +3,12 @@
#ifndef NETDATA_PLUGINSD_PARSER_H
#define NETDATA_PLUGINSD_PARSER_H
-#include "parser/parser.h"
+#include "daemon/common.h"
+
+typedef enum __attribute__ ((__packed__)) {
+ PARSER_INIT_PLUGINSD = (1 << 1),
+ PARSER_INIT_STREAMING = (1 << 2),
+} PLUGINSD_KEYWORDS;
typedef struct parser_user_object {
PARSER *parser;
@@ -17,6 +22,14 @@ typedef struct parser_user_object {
size_t data_collections_count;
int enabled;
+ struct {
+ bool parsing_host;
+ uuid_t machine_guid;
+ char machine_guid_str[UUID_STR_LEN];
+ STRING *hostname;
+ DICTIONARY *rrdlabels;
+ } host_define;
+
struct parser_user_object_replay {
time_t start_time;
time_t end_time;
@@ -42,4 +55,6 @@ typedef struct parser_user_object {
PARSER_RC pluginsd_function(char **words, size_t num_words, void *user);
PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, void *user);
void inflight_functions_init(PARSER *parser);
+void pluginsd_keywords_init(PARSER *parser, PLUGINSD_KEYWORDS types);
+
#endif //NETDATA_PLUGINSD_PARSER_H