summaryrefslogtreecommitdiffstats
path: root/collectors/plugins.d/pluginsd_parser.c
diff options
context:
space:
mode:
Diffstat (limited to 'collectors/plugins.d/pluginsd_parser.c')
-rw-r--r--collectors/plugins.d/pluginsd_parser.c494
1 files changed, 426 insertions, 68 deletions
diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c
index fbcd812627..d568db5ca2 100644
--- a/collectors/plugins.d/pluginsd_parser.c
+++ b/collectors/plugins.d/pluginsd_parser.c
@@ -4,6 +4,9 @@
#define LOG_FUNCTIONS false
+#define SERVING_STREAMING(parser) (parser->repertoire == PARSER_INIT_STREAMING)
+#define SERVING_PLUGINSD(parser) (parser->repertoire == PARSER_INIT_PLUGINSD)
+
static ssize_t send_to_plugin(const char *txt, void *data) {
PARSER *parser = data;
@@ -742,6 +745,7 @@ struct inflight_function {
usec_t sent_ut;
const char *payload;
PARSER *parser;
+ bool virtual;
};
static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void *func, void *parser_ptr) {
@@ -808,16 +812,83 @@ static bool inflight_functions_conflict_callback(const DICTIONARY_ITEM *item __m
return false;
}
-static void inflight_functions_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func, void *parser_ptr __maybe_unused) {
+void delete_job_finalize(struct parser *parser, struct configurable_plugin *plug, const char *fnc_sig, int code) {
+ if (code != DYNCFG_VFNC_RET_CFG_ACCEPTED)
+ return;
+
+ char *params_local = strdupz(fnc_sig);
+ char *words[DYNCFG_MAX_WORDS];
+ size_t words_c = quoted_strings_splitter(params_local, words, DYNCFG_MAX_WORDS, isspace_map_pluginsd);
+
+ if (words_c != 3) {
+ netdata_log_error("PLUGINSD_PARSER: invalid number of parameters for delete_job");
+ freez(params_local);
+ return;
+ }
+
+ const char *module = words[1];
+ const char *job = words[2];
+
+ delete_job(plug, module, job);
+
+ unlink_job(plug->name, module, job);
+
+ rrdpush_send_job_deleted(localhost, plug->name, module, job);
+
+ freez(params_local);
+}
+
+void set_job_finalize(struct parser *parser, struct configurable_plugin *plug, const char *fnc_sig, int code) {
+ if (code != DYNCFG_VFNC_RET_CFG_ACCEPTED)
+ return;
+
+ char *params_local = strdupz(fnc_sig);
+ char *words[DYNCFG_MAX_WORDS];
+ size_t words_c = quoted_strings_splitter(params_local, words, DYNCFG_MAX_WORDS, isspace_map_pluginsd);
+
+ if (words_c != 3) {
+ netdata_log_error("PLUGINSD_PARSER: invalid number of parameters for set_job_config");
+ freez(params_local);
+ return;
+ }
+
+ const char *module_name = get_word(words, words_c, 1);
+ const char *job_name = get_word(words, words_c, 2);
+
+ if (register_job(parser->user.host->configurable_plugins, parser->user.cd->configuration->name, module_name, job_name, JOB_TYPE_USER, JOB_FLG_USER_CREATED, 1)) {
+ freez(params_local);
+ return;
+ }
+
+ // only send this if it is not existing already (register_job cares for that)
+ rrdpush_send_dyncfg_reg_job(localhost, parser->user.cd->configuration->name, module_name, job_name, JOB_TYPE_USER, JOB_FLG_USER_CREATED);
+
+ freez(params_local);
+}
+
+static void inflight_functions_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func, void *parser_ptr) {
struct inflight_function *pf = func;
+ struct parser *parser = (struct parser *)parser_ptr;
internal_error(LOG_FUNCTIONS,
"FUNCTION '%s' result of transaction '%s' received from collector (%zu bytes, request %"PRIu64" usec, response %"PRIu64" usec)",
string2str(pf->function), dictionary_acquired_item_name(item),
buffer_strlen(pf->result_body_wb), pf->sent_ut - pf->started_ut, now_realtime_usec() - pf->sent_ut);
+ if (pf->virtual && SERVING_PLUGINSD(parser)) {
+ if (pf->payload) {
+ if (strncmp(string2str(pf->function), FUNCTION_NAME_SET_JOB_CONFIG, strlen(FUNCTION_NAME_SET_JOB_CONFIG)) == 0)
+ set_job_finalize(parser, parser->user.cd->configuration, string2str(pf->function), pf->code);
+ dyn_conf_store_config(string2str(pf->function), pf->payload, parser->user.cd->configuration);
+ } else if (strncmp(string2str(pf->function), FUNCTION_NAME_DELETE_JOB, strlen(FUNCTION_NAME_DELETE_JOB)) == 0) {
+ delete_job_finalize(parser, parser->user.cd->configuration, string2str(pf->function), pf->code);
+ }
+ }
+
pf->result_cb(pf->result_body_wb, pf->code, pf->result_cb_data);
+
string_freez(pf->function);
+ freez(pf->payload);
}
void inflight_functions_init(PARSER *parser) {
@@ -1947,9 +2018,81 @@ static void virt_fnc_got_data_cb(BUFFER *wb __maybe_unused, int code, void *call
}
#define VIRT_FNC_TIMEOUT 1
+#define VIRT_FNC_BUF_SIZE (4096)
+void call_virtual_function_async(BUFFER *wb, RRDHOST *host, const char *name, const char *payload, rrd_function_result_callback_t callback, void *callback_data) {
+ PARSER *parser = NULL;
+
+ //TODO simplify (as we really need only first parameter to get plugin name maybe we can avoid parsing all)
+ char *words[PLUGINSD_MAX_WORDS];
+ char *function_with_params = strdupz(name);
+ size_t num_words = quoted_strings_splitter(function_with_params, words, PLUGINSD_MAX_WORDS, isspace_map_pluginsd);
+
+ if (num_words < 2) {
+ netdata_log_error("PLUGINSD: virtual function name is empty.");
+ freez(function_with_params);
+ return;
+ }
+
+ const DICTIONARY_ITEM *cpi = dictionary_get_and_acquire_item(host->configurable_plugins, get_word(words, num_words, 1));
+ if (unlikely(cpi == NULL)) {
+ netdata_log_error("PLUGINSD: virtual function plugin '%s' not found.", name);
+ freez(function_with_params);
+ return;
+ }
+ struct configurable_plugin *cp = dictionary_acquired_item_value(cpi);
+ parser = (PARSER *)cp->cb_usr_ctx;
+
+ BUFFER *function_out = buffer_create(VIRT_FNC_BUF_SIZE, NULL);
+ // if we are forwarding this to a plugin (as opposed to streaming/child) we have to remove the first parameter (plugin_name)
+ buffer_strcat(function_out, get_word(words, num_words, 0));
+ for (size_t i = 1; i < num_words; i++) {
+ if (i == 1 && SERVING_PLUGINSD(parser))
+ continue;
+ buffer_sprintf(function_out, " %s", get_word(words, num_words, i));
+ }
+ freez(function_with_params);
+
+ usec_t now = now_realtime_usec();
+
+ struct inflight_function tmp = {
+ .started_ut = now,
+ .timeout_ut = now + VIRT_FNC_TIMEOUT + USEC_PER_SEC,
+ .result_body_wb = wb,
+ .timeout = VIRT_FNC_TIMEOUT * 10,
+ .function = string_strdupz(buffer_tostring(function_out)),
+ .result_cb = callback,
+ .result_cb_data = callback_data,
+ .payload = payload != NULL ? strdupz(payload) : NULL,
+ .virtual = true,
+ };
+ buffer_free(function_out);
+
+ uuid_t uuid;
+ uuid_generate_time(uuid);
+
+ char key[UUID_STR_LEN];
+ uuid_unparse_lower(uuid, key);
+
+ dictionary_write_lock(parser->inflight.functions);
+
+ // if there is any error, our dictionary callbacks will call the caller callback to notify
+ // the caller about the error - no need for error handling here.
+ dictionary_set(parser->inflight.functions, key, &tmp, sizeof(struct inflight_function));
+
+ if(!parser->inflight.smaller_timeout || tmp.timeout_ut < parser->inflight.smaller_timeout)
+ parser->inflight.smaller_timeout = tmp.timeout_ut;
+
+ // garbage collect stale inflight functions
+ if(parser->inflight.smaller_timeout < now)
+ inflight_functions_garbage_collect(parser, now);
+
+ dictionary_write_unlock(parser->inflight.functions);
+}
+
+
dyncfg_config_t call_virtual_function_blocking(PARSER *parser, const char *name, int *rc, const char *payload) {
usec_t now = now_realtime_usec();
- BUFFER *wb = buffer_create(4096, NULL);
+ BUFFER *wb = buffer_create(VIRT_FNC_BUF_SIZE, NULL);
struct mutex_cond cond = {
.lock = PTHREAD_MUTEX_INITIALIZER,
@@ -1964,7 +2107,8 @@ dyncfg_config_t call_virtual_function_blocking(PARSER *parser, const char *name,
.function = string_strdupz(name),
.result_cb = virt_fnc_got_data_cb,
.result_cb_data = &cond,
- .payload = payload,
+ .payload = payload != NULL ? strdupz(payload) : NULL,
+ .virtual = true,
};
uuid_t uuid;
@@ -2011,98 +2155,188 @@ dyncfg_config_t call_virtual_function_blocking(PARSER *parser, const char *name,
return cfg;
}
-static dyncfg_config_t get_plugin_config_cb(void *usr_ctx)
+#define CVF_MAX_LEN (1024)
+static dyncfg_config_t get_plugin_config_cb(void *usr_ctx, const char *plugin_name)
{
PARSER *parser = usr_ctx;
- return call_virtual_function_blocking(parser, "get_plugin_config", NULL, NULL);
+
+ if (SERVING_STREAMING(parser)) {
+ char buf[CVF_MAX_LEN + 1];
+ snprintfz(buf, CVF_MAX_LEN, FUNCTION_NAME_GET_PLUGIN_CONFIG " %s", plugin_name);
+ return call_virtual_function_blocking(parser, buf, NULL, NULL);
+ }
+
+ return call_virtual_function_blocking(parser, FUNCTION_NAME_GET_PLUGIN_CONFIG, NULL, NULL);
}
-static dyncfg_config_t get_plugin_config_schema_cb(void *usr_ctx)
+static dyncfg_config_t get_plugin_config_schema_cb(void *usr_ctx, const char *plugin_name)
{
PARSER *parser = usr_ctx;
+
+ if (SERVING_STREAMING(parser)) {
+ char buf[CVF_MAX_LEN + 1];
+ snprintfz(buf, CVF_MAX_LEN, FUNCTION_NAME_GET_PLUGIN_CONFIG_SCHEMA " %s", plugin_name);
+ return call_virtual_function_blocking(parser, buf, NULL, NULL);
+ }
+
return call_virtual_function_blocking(parser, "get_plugin_config_schema", NULL, NULL);
}
-static dyncfg_config_t get_module_config_cb(void *usr_ctx, const char *module_name)
+static dyncfg_config_t get_module_config_cb(void *usr_ctx, const char *plugin_name, const char *module_name)
{
PARSER *parser = usr_ctx;
- char buf[1024];
- snprintfz(buf, sizeof(buf), "get_module_config %s", module_name);
- return call_virtual_function_blocking(parser, buf, NULL, NULL);
+ BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL);
+
+ buffer_strcat(wb, FUNCTION_NAME_GET_MODULE_CONFIG);
+ if (SERVING_STREAMING(parser))
+ buffer_sprintf(wb, " %s", plugin_name);
+
+ buffer_sprintf(wb, " %s", module_name);
+
+ dyncfg_config_t ret = call_virtual_function_blocking(parser, buffer_tostring(wb), NULL, NULL);
+
+ buffer_free(wb);
+
+ return ret;
}
-static dyncfg_config_t get_module_config_schema_cb(void *usr_ctx, const char *module_name)
+static dyncfg_config_t get_module_config_schema_cb(void *usr_ctx, const char *plugin_name, const char *module_name)
{
PARSER *parser = usr_ctx;
- char buf[1024];
- snprintfz(buf, sizeof(buf), "get_module_config_schema %s", module_name);
- return call_virtual_function_blocking(parser, buf, NULL, NULL);
+ BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL);
+
+ buffer_strcat(wb, FUNCTION_NAME_GET_MODULE_CONFIG_SCHEMA);
+ if (SERVING_STREAMING(parser))
+ buffer_sprintf(wb, " %s", plugin_name);
+
+ buffer_sprintf(wb, " %s", module_name);
+
+ dyncfg_config_t ret = call_virtual_function_blocking(parser, buffer_tostring(wb), NULL, NULL);
+
+ buffer_free(wb);
+
+ return ret;
}
-static dyncfg_config_t get_job_config_schema_cb(void *usr_ctx, const char *module_name)
+static dyncfg_config_t get_job_config_schema_cb(void *usr_ctx, const char *plugin_name, const char *module_name)
{
PARSER *parser = usr_ctx;
- char buf[1024];
- snprintfz(buf, sizeof(buf), "get_job_config_schema %s", module_name);
- return call_virtual_function_blocking(parser, buf, NULL, NULL);
+ BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL);
+
+ buffer_strcat(wb, FUNCTION_NAME_GET_JOB_CONFIG_SCHEMA);
+
+ if (SERVING_STREAMING(parser))
+ buffer_sprintf(wb, " %s", plugin_name);
+
+ buffer_sprintf(wb, " %s", module_name);
+
+ dyncfg_config_t ret = call_virtual_function_blocking(parser, buffer_tostring(wb), NULL, NULL);
+
+ buffer_free(wb);
+
+ return ret;
}
-static dyncfg_config_t get_job_config_cb(void *usr_ctx, const char *module_name, const char* job_name)
+static dyncfg_config_t get_job_config_cb(void *usr_ctx, const char *plugin_name, const char *module_name, const char* job_name)
{
PARSER *parser = usr_ctx;
- char buf[1024];
- snprintfz(buf, sizeof(buf), "get_job_config %s %s", module_name, job_name);
- return call_virtual_function_blocking(parser, buf, NULL, NULL);
+ BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL);
+
+ buffer_strcat(wb, FUNCTION_NAME_GET_JOB_CONFIG);
+
+ if (SERVING_STREAMING(parser))
+ buffer_sprintf(wb, " %s", plugin_name);
+
+ buffer_sprintf(wb, " %s %s", module_name, job_name);
+
+ dyncfg_config_t ret = call_virtual_function_blocking(parser, buffer_tostring(wb), NULL, NULL);
+
+ buffer_free(wb);
+
+ return ret;
}
-enum set_config_result set_plugin_config_cb(void *usr_ctx, dyncfg_config_t *cfg)
+enum set_config_result set_plugin_config_cb(void *usr_ctx, const char *plugin_name, dyncfg_config_t *cfg)
{
PARSER *parser = usr_ctx;
+ BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL);
+
+ buffer_strcat(wb, FUNCTION_NAME_SET_PLUGIN_CONFIG);
+
+ if (SERVING_STREAMING(parser))
+ buffer_sprintf(wb, " %s", plugin_name);
+
int rc;
- call_virtual_function_blocking(parser, "set_plugin_config", &rc, cfg->data);
- if(rc != 1)
+ call_virtual_function_blocking(parser, buffer_tostring(wb), &rc, cfg->data);
+
+ buffer_free(wb);
+ if(rc != DYNCFG_VFNC_RET_CFG_ACCEPTED)
return SET_CONFIG_REJECTED;
return SET_CONFIG_ACCEPTED;
}
-enum set_config_result set_module_config_cb(void *usr_ctx, const char *module_name, dyncfg_config_t *cfg)
+enum set_config_result set_module_config_cb(void *usr_ctx, const char *plugin_name, const char *module_name, dyncfg_config_t *cfg)
{
PARSER *parser = usr_ctx;
+ BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL);
+
+ buffer_strcat(wb, FUNCTION_NAME_SET_MODULE_CONFIG);
+
+ if (SERVING_STREAMING(parser))
+ buffer_sprintf(wb, " %s", plugin_name);
+
+ buffer_sprintf(wb, " %s", module_name);
+
int rc;
+ call_virtual_function_blocking(parser, buffer_tostring(wb), &rc, cfg->data);
- char buf[1024];
- snprintfz(buf, sizeof(buf), "set_module_config %s", module_name);
- call_virtual_function_blocking(parser, buf, &rc, cfg->data);
+ buffer_free(wb);
- if(rc != 1)
+ if(rc != DYNCFG_VFNC_RET_CFG_ACCEPTED)
return SET_CONFIG_REJECTED;
return SET_CONFIG_ACCEPTED;
}
-enum set_config_result set_job_config_cb(void *usr_ctx, const char *module_name, const char *job_name, dyncfg_config_t *cfg)
+enum set_config_result set_job_config_cb(void *usr_ctx, const char *plugin_name, const char *module_name, const char *job_name, dyncfg_config_t *cfg)
{
PARSER *parser = usr_ctx;
+ BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL);
+
+ buffer_strcat(wb, FUNCTION_NAME_SET_JOB_CONFIG);
+
+ if (SERVING_STREAMING(parser))
+ buffer_sprintf(wb, " %s", plugin_name);
+
+ buffer_sprintf(wb, " %s %s", module_name, job_name);
+
int rc;
+ call_virtual_function_blocking(parser, buffer_tostring(wb), &rc, cfg->data);
- char buf[1024];
- snprintfz(buf, sizeof(buf), "set_job_config %s %s", module_name, job_name);
- call_virtual_function_blocking(parser, buf, &rc, cfg->data);
+ buffer_free(wb);
- if(rc != 1)
+ if(rc != DYNCFG_VFNC_RET_CFG_ACCEPTED)
return SET_CONFIG_REJECTED;
return SET_CONFIG_ACCEPTED;
}
-enum set_config_result delete_job_cb(void *usr_ctx, const char *module_name, const char *job_name)
+enum set_config_result delete_job_cb(void *usr_ctx, const char *plugin_name ,const char *module_name, const char *job_name)
{
PARSER *parser = usr_ctx;
+ BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL);
+
+ buffer_strcat(wb, FUNCTION_NAME_DELETE_JOB);
+
+ if (SERVING_STREAMING(parser))
+ buffer_sprintf(wb, " %s", plugin_name);
+
+ buffer_sprintf(wb, " %s %s", module_name, job_name);
+
int rc;
+ call_virtual_function_blocking(parser, buffer_tostring(wb), &rc, NULL);
- char buf[1024];
- snprintfz(buf, sizeof(buf), "delete_job %s %s", module_name, job_name);
- call_virtual_function_blocking(parser, buf, &rc, NULL);
+ buffer_free(wb);
- if(rc != 1)
+ if(rc != DYNCFG_VFNC_RET_CFG_ACCEPTED)
return SET_CONFIG_REJECTED;
return SET_CONFIG_ACCEPTED;
}
@@ -2122,37 +2356,65 @@ static inline PARSER_RC pluginsd_register_plugin(char **words __maybe_unused, si
cfg->get_config_schema_cb = get_plugin_config_schema_cb;
cfg->cb_usr_ctx = parser;
- parser->user.cd->cfg_dict_item = register_plugin(cfg);
-
- if (unlikely(parser->user.cd->cfg_dict_item == NULL)) {
+ const DICTIONARY_ITEM *di = register_plugin(parser->user.host->configurable_plugins, cfg, SERVING_PLUGINSD(parser));
+ if (unlikely(di == NULL)) {
freez(cfg->name);
freez(cfg);
return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_ENABLE, "error registering plugin");
}
- parser->user.cd->configuration = cfg;
+ if (SERVING_PLUGINSD(parser)) {
+ // this is optimization for pluginsd to avoid extra dictionary lookup
+ // as we know which plugin is comunicating with us
+ parser->user.cd->cfg_dict_item = di;
+ parser->user.cd->configuration = cfg;
+ } else {
+ // register_plugin keeps the item acquired, so we need to release it
+ dictionary_acquired_item_release(parser->user.host->configurable_plugins, di);
+ }
+
+ rrdpush_send_dyncfg_enable(parser->user.host, cfg->name);
+
return PARSER_RC_OK;
}
+#define LOG_MSG_SIZE (1024)
+#define MODULE_NAME_IDX (SERVING_PLUGINSD(parser) ? 1 : 2)
+#define MODULE_TYPE_IDX (SERVING_PLUGINSD(parser) ? 2 : 3)
static inline PARSER_RC pluginsd_register_module(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) {
netdata_log_info("PLUGINSD: DYNCFG_REG_MODULE");
- struct configurable_plugin *plug_cfg = parser->user.cd->configuration;
- if (unlikely(plug_cfg == NULL))
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "you have to enable dynamic configuration first using " PLUGINSD_KEYWORD_DYNCFG_ENABLE);
+ size_t expected_num_words = SERVING_PLUGINSD(parser) ? 3 : 4;
- if (unlikely(num_words != 3))
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "expected 2 parameters module_name followed by module_type");
+ if (unlikely(num_words != expected_num_words)) {
+ char log[LOG_MSG_SIZE + 1];
+ snprintfz(log, LOG_MSG_SIZE, "expected %zu (got %zu) parameters: %smodule_name module_type", expected_num_words - 1, num_words - 1, SERVING_PLUGINSD(parser) ? "" : "plugin_name ");
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, log);
+ }
+
+ struct configurable_plugin *plug_cfg;
+ const DICTIONARY_ITEM *di = NULL;
+ if (SERVING_PLUGINSD(parser)) {
+ plug_cfg = parser->user.cd->configuration;
+ if (unlikely(plug_cfg == NULL))
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "you have to enable dynamic configuration first using " PLUGINSD_KEYWORD_DYNCFG_ENABLE);
+ } else {
+ di = dictionary_get_and_acquire_item(parser->user.host->configurable_plugins, words[1]);
+ if (unlikely(di == NULL))
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "plugin not found");
+
+ plug_cfg = (struct configurable_plugin *)dictionary_acquired_item_value(di);
+ }
struct module *mod = callocz(1, sizeof(struct module));
- mod->type = str2_module_type(words[2]);
+ mod->type = str2_module_type(words[MODULE_TYPE_IDX]);
if (unlikely(mod->type == MOD_TYPE_UNKNOWN)) {
freez(mod);
return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "unknown module type (allowed: job_array, single)");
}
- mod->name = strdupz(words[1]);
+ mod->name = strdupz(words[MODULE_NAME_IDX]);
mod->set_config_cb = set_module_config_cb;
mod->get_config_cb = get_module_config_cb;
@@ -2165,27 +2427,111 @@ static inline PARSER_RC pluginsd_register_module(char **words __maybe_unused, si
mod->delete_job_cb = delete_job_cb;
mod->job_config_cb_usr_ctx = parser;
- register_module(plug_cfg, mod);
+ register_module(parser->user.host->configurable_plugins, plug_cfg, mod, SERVING_PLUGINSD(parser));
+
+ if (di != NULL)
+ dictionary_acquired_item_release(parser->user.host->configurable_plugins, di);
+
+ rrdpush_send_dyncfg_reg_module(parser->user.host, plug_cfg->name, mod->name, mod->type);
+
return PARSER_RC_OK;
}
-// job_status <module_name> <job_name> <status_code> <state> <message>
-static inline PARSER_RC pluginsd_job_status(char **words, size_t num_words, PARSER *parser)
-{
- if (unlikely(num_words != 6 && num_words != 5))
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_REPORT_JOB_STATUS, "expected 4 or 5 parameters: module_name, job_name, status_code, state, [optional: message]");
+static inline PARSER_RC pluginsd_register_job_common(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused, const char *plugin_name) {
+ if (atol(words[3]) < 0)
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "invalid flags");
+ dyncfg_job_flg_t flags = atol(words[3]);
+ if (SERVING_PLUGINSD(parser))
+ flags |= JOB_FLG_PLUGIN_PUSHED;
+ else
+ flags |= JOB_FLG_STREAMING_PUSHED;
+
+ enum job_type job_type = str2job_type(words[2]);
+ if (job_type == JOB_TYPE_UNKNOWN)
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "unknown job type");
+ if (SERVING_PLUGINSD(parser) && job_type == JOB_TYPE_USER)
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "plugins cannot push jobs of type \"user\" (this is allowed only in streaming)");
+
+ if (register_job(parser->user.host->configurable_plugins, plugin_name, words[0], words[1], job_type, flags, 0)) // ignore existing is off as this is explicitly called register job
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "error registering job");
+
+ rrdpush_send_dyncfg_reg_job(parser->user.host, plugin_name, words[0], words[1], job_type, flags);
+ return PARSER_RC_OK;
+}
+
+static inline PARSER_RC pluginsd_register_job(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) {
+ size_t expected_num_words = SERVING_PLUGINSD(parser) ? 5 : 6;
+
+ if (unlikely(num_words != expected_num_words)) {
+ char log[LOG_MSG_SIZE + 1];
+ snprintfz(log, LOG_MSG_SIZE, "expected %zu (got %zu) parameters: %smodule_name job_name job_type", expected_num_words - 1, num_words - 1, SERVING_PLUGINSD(parser) ? "" : "plugin_name ");
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, log);
+ }
- int state = atoi(words[4]);
+ if (SERVING_PLUGINSD(parser)) {
+ return pluginsd_register_job_common(&words[1], num_words - 1, parser, parser->user.cd->configuration->name);
+ }
+ return pluginsd_register_job_common(&words[2], num_words - 2, parser, words[1]);
+}
- enum job_status job_status = str2job_state(words[3]);
- if (unlikely(job_status == JOB_STATUS_UNKNOWN))
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_REPORT_JOB_STATUS, "unknown job state");
+static inline PARSER_RC pluginsd_job_status_common(char **words, size_t num_words, PARSER *parser, const char *plugin_name) {
+ int state = str2i(words[3]);
+
+ enum job_status status = str2job_state(words[2]);
+ if (unlikely(SERVING_PLUGINSD(parser) && status == JOB_STATUS_UNKNOWN))
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_REPORT_JOB_STATUS, "unknown job status");
char *message = NULL;
- if (num_words == 6)
- message = strdupz(words[5]);
+ if (num_words == 5)
+ message = words[4];
+
+ const DICTIONARY_ITEM *plugin_item;
+ DICTIONARY *job_dict;
+ const DICTIONARY_ITEM *job_item = report_job_status_acq_lock(parser->user.host->configurable_plugins, &plugin_item, &job_dict, plugin_name, words[0], words[1], status, state, message);
+
+ if (job_item != NULL) {
+ struct job *job = dictionary_acquired_item_value(job_item);
+ rrdpush_send_job_status_update(parser->user.host, plugin_name, words[0], job);
+
+ pthread_mutex_unlock(&job->lock);
+ dictionary_acquired_item_release(job_dict, job_item);
+ dictionary_acquired_item_release(parser->user.host->configurable_plugins, plugin_item);
+ }
- report_job_status(parser->user.cd->configuration, words[1], words[2], job_status, state, message);
+ return PARSER_RC_OK;
+}
+
+// job_status [plugin_name if streaming] <module_name> <job_name> <status_code> <state> [message]
+static PARSER_RC pluginsd_job_status(char **words, size_t num_words, PARSER *parser) {
+ if (SERVING_PLUGINSD(parser)) {
+ if (unlikely(num_words != 5 && num_words != 6))
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_REPORT_JOB_STATUS, "expected 4 or 5 parameters: module_name, job_name, status_code, state, [optional: message]");
+ } else {
+ if (unlikely(num_words != 6 && num_words != 7))
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_REPORT_JOB_STATUS, "expected 5 or 6 parameters: plugin_name, module_name, job_name, status_code, state, [optional: message]");
+ }
+
+ if (SERVING_PLUGINSD(parser)) {
+ return pluginsd_job_status_common(&words[1], num_words - 1, parser, parser->user.cd->configuration->name);
+ }
+ return pluginsd_job_status_common(&words[2], num_words - 2, parser, words[1]);
+}
+
+static PARSER_RC pluginsd_delete_job(char **words, size_t num_words, PARSER *parser) {
+ // this can confuse a bit but there is a diference between KEYWORD_DELETE_JOB and actual delete_job function
+ // they are of opossite direction
+ if (num_words != 4)
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DELETE_JOB, "expected 2 parameters: plugin_name, module_name, job_name");
+
+ const char *plugin_name = get_word(words, num_words, 1);
+ const char *module_name = get_word(words, num_words, 2);
+ const char *job_name = get_word(words, num_words, 3);
+
+ if (SERVING_STREAMING(parser))
+ delete_job_pname(parser->user.host->configurable_plugins, plugin_name, module_name, job_name);
+
+ // forward to parent if any
+ rrdpush_send_job_deleted(parser->user.host, plugin_name, module_name, job_name);
return PARSER_RC_OK;
}
@@ -2506,8 +2852,14 @@ PARSER_RC parser_execute(PARSER *parser, PARSER_KEYWORD *keyword, char **words,
case 102:
return pluginsd_register_module(words, num_words, parser);
+ case 103:
+ return pluginsd_register_job(words, num_words, parser);
+
case 110:
return pluginsd_job_status(words, num_words, parser);
+
+ case 111:
+ return pluginsd_delete_job(words, num_words, parser);
default:
fatal("Unknown keyword '%s' with id %zu", keyword->keyword, keyword->id);
@@ -2525,14 +2877,20 @@ void parser_init_repertoire(PARSER *parser, PARSER_REPERTOIRE repertoire) {
}
}
+static void parser_destroy_dyncfg(PARSER *parser) {
+ if (parser->user.cd != NULL && parser->user.cd->configuration != NULL) {
+ unregister_plugin(parser->user.host->configurable_plugins, parser->user.cd->cfg_dict_item);
+ parser->user.cd->configuration = NULL;
+ } else if (parser->user.host != NULL && SERVING_STREAMING(parser) && parser->user.host != localhost){
+ dictionary_flush(parser->user.host->configurable_plugins);
+ }
+}
+
void parser_destroy(PARSER *parser) {
if (unlikely(!parser))
return;
- if (parser->user.cd != NULL && parser->user.cd->configuration != NULL) {
- unregister_plugin(parser->user.cd->cfg_dict_item);
- parser->user.cd->configuration = NULL;
- }
+ parser_destroy_dyncfg(parser);
dictionary_destroy(parser->inflight.functions);
freez(parser);