diff options
Diffstat (limited to 'collectors/plugins.d/pluginsd_parser.c')
-rw-r--r-- | collectors/plugins.d/pluginsd_parser.c | 494 |
1 files changed, 426 insertions, 68 deletions
diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c index fbcd812627..d568db5ca2 100644 --- a/collectors/plugins.d/pluginsd_parser.c +++ b/collectors/plugins.d/pluginsd_parser.c @@ -4,6 +4,9 @@ #define LOG_FUNCTIONS false +#define SERVING_STREAMING(parser) (parser->repertoire == PARSER_INIT_STREAMING) +#define SERVING_PLUGINSD(parser) (parser->repertoire == PARSER_INIT_PLUGINSD) + static ssize_t send_to_plugin(const char *txt, void *data) { PARSER *parser = data; @@ -742,6 +745,7 @@ struct inflight_function { usec_t sent_ut; const char *payload; PARSER *parser; + bool virtual; }; static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void *func, void *parser_ptr) { @@ -808,16 +812,83 @@ static bool inflight_functions_conflict_callback(const DICTIONARY_ITEM *item __m return false; } -static void inflight_functions_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func, void *parser_ptr __maybe_unused) { +void delete_job_finalize(struct parser *parser, struct configurable_plugin *plug, const char *fnc_sig, int code) { + if (code != DYNCFG_VFNC_RET_CFG_ACCEPTED) + return; + + char *params_local = strdupz(fnc_sig); + char *words[DYNCFG_MAX_WORDS]; + size_t words_c = quoted_strings_splitter(params_local, words, DYNCFG_MAX_WORDS, isspace_map_pluginsd); + + if (words_c != 3) { + netdata_log_error("PLUGINSD_PARSER: invalid number of parameters for delete_job"); + freez(params_local); + return; + } + + const char *module = words[1]; + const char *job = words[2]; + + delete_job(plug, module, job); + + unlink_job(plug->name, module, job); + + rrdpush_send_job_deleted(localhost, plug->name, module, job); + + freez(params_local); +} + +void set_job_finalize(struct parser *parser, struct configurable_plugin *plug, const char *fnc_sig, int code) { + if (code != DYNCFG_VFNC_RET_CFG_ACCEPTED) + return; + + char *params_local = strdupz(fnc_sig); + char *words[DYNCFG_MAX_WORDS]; + size_t words_c = quoted_strings_splitter(params_local, words, DYNCFG_MAX_WORDS, isspace_map_pluginsd); + + if (words_c != 3) { + netdata_log_error("PLUGINSD_PARSER: invalid number of parameters for set_job_config"); + freez(params_local); + return; + } + + const char *module_name = get_word(words, words_c, 1); + const char *job_name = get_word(words, words_c, 2); + + if (register_job(parser->user.host->configurable_plugins, parser->user.cd->configuration->name, module_name, job_name, JOB_TYPE_USER, JOB_FLG_USER_CREATED, 1)) { + freez(params_local); + return; + } + + // only send this if it is not existing already (register_job cares for that) + rrdpush_send_dyncfg_reg_job(localhost, parser->user.cd->configuration->name, module_name, job_name, JOB_TYPE_USER, JOB_FLG_USER_CREATED); + + freez(params_local); +} + +static void inflight_functions_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func, void *parser_ptr) { struct inflight_function *pf = func; + struct parser *parser = (struct parser *)parser_ptr; internal_error(LOG_FUNCTIONS, "FUNCTION '%s' result of transaction '%s' received from collector (%zu bytes, request %"PRIu64" usec, response %"PRIu64" usec)", string2str(pf->function), dictionary_acquired_item_name(item), buffer_strlen(pf->result_body_wb), pf->sent_ut - pf->started_ut, now_realtime_usec() - pf->sent_ut); + if (pf->virtual && SERVING_PLUGINSD(parser)) { + if (pf->payload) { + if (strncmp(string2str(pf->function), FUNCTION_NAME_SET_JOB_CONFIG, strlen(FUNCTION_NAME_SET_JOB_CONFIG)) == 0) + set_job_finalize(parser, parser->user.cd->configuration, string2str(pf->function), pf->code); + dyn_conf_store_config(string2str(pf->function), pf->payload, parser->user.cd->configuration); + } else if (strncmp(string2str(pf->function), FUNCTION_NAME_DELETE_JOB, strlen(FUNCTION_NAME_DELETE_JOB)) == 0) { + delete_job_finalize(parser, parser->user.cd->configuration, string2str(pf->function), pf->code); + } + } + pf->result_cb(pf->result_body_wb, pf->code, pf->result_cb_data); + string_freez(pf->function); + freez(pf->payload); } void inflight_functions_init(PARSER *parser) { @@ -1947,9 +2018,81 @@ static void virt_fnc_got_data_cb(BUFFER *wb __maybe_unused, int code, void *call } #define VIRT_FNC_TIMEOUT 1 +#define VIRT_FNC_BUF_SIZE (4096) +void call_virtual_function_async(BUFFER *wb, RRDHOST *host, const char *name, const char *payload, rrd_function_result_callback_t callback, void *callback_data) { + PARSER *parser = NULL; + + //TODO simplify (as we really need only first parameter to get plugin name maybe we can avoid parsing all) + char *words[PLUGINSD_MAX_WORDS]; + char *function_with_params = strdupz(name); + size_t num_words = quoted_strings_splitter(function_with_params, words, PLUGINSD_MAX_WORDS, isspace_map_pluginsd); + + if (num_words < 2) { + netdata_log_error("PLUGINSD: virtual function name is empty."); + freez(function_with_params); + return; + } + + const DICTIONARY_ITEM *cpi = dictionary_get_and_acquire_item(host->configurable_plugins, get_word(words, num_words, 1)); + if (unlikely(cpi == NULL)) { + netdata_log_error("PLUGINSD: virtual function plugin '%s' not found.", name); + freez(function_with_params); + return; + } + struct configurable_plugin *cp = dictionary_acquired_item_value(cpi); + parser = (PARSER *)cp->cb_usr_ctx; + + BUFFER *function_out = buffer_create(VIRT_FNC_BUF_SIZE, NULL); + // if we are forwarding this to a plugin (as opposed to streaming/child) we have to remove the first parameter (plugin_name) + buffer_strcat(function_out, get_word(words, num_words, 0)); + for (size_t i = 1; i < num_words; i++) { + if (i == 1 && SERVING_PLUGINSD(parser)) + continue; + buffer_sprintf(function_out, " %s", get_word(words, num_words, i)); + } + freez(function_with_params); + + usec_t now = now_realtime_usec(); + + struct inflight_function tmp = { + .started_ut = now, + .timeout_ut = now + VIRT_FNC_TIMEOUT + USEC_PER_SEC, + .result_body_wb = wb, + .timeout = VIRT_FNC_TIMEOUT * 10, + .function = string_strdupz(buffer_tostring(function_out)), + .result_cb = callback, + .result_cb_data = callback_data, + .payload = payload != NULL ? strdupz(payload) : NULL, + .virtual = true, + }; + buffer_free(function_out); + + uuid_t uuid; + uuid_generate_time(uuid); + + char key[UUID_STR_LEN]; + uuid_unparse_lower(uuid, key); + + dictionary_write_lock(parser->inflight.functions); + + // if there is any error, our dictionary callbacks will call the caller callback to notify + // the caller about the error - no need for error handling here. + dictionary_set(parser->inflight.functions, key, &tmp, sizeof(struct inflight_function)); + + if(!parser->inflight.smaller_timeout || tmp.timeout_ut < parser->inflight.smaller_timeout) + parser->inflight.smaller_timeout = tmp.timeout_ut; + + // garbage collect stale inflight functions + if(parser->inflight.smaller_timeout < now) + inflight_functions_garbage_collect(parser, now); + + dictionary_write_unlock(parser->inflight.functions); +} + + dyncfg_config_t call_virtual_function_blocking(PARSER *parser, const char *name, int *rc, const char *payload) { usec_t now = now_realtime_usec(); - BUFFER *wb = buffer_create(4096, NULL); + BUFFER *wb = buffer_create(VIRT_FNC_BUF_SIZE, NULL); struct mutex_cond cond = { .lock = PTHREAD_MUTEX_INITIALIZER, @@ -1964,7 +2107,8 @@ dyncfg_config_t call_virtual_function_blocking(PARSER *parser, const char *name, .function = string_strdupz(name), .result_cb = virt_fnc_got_data_cb, .result_cb_data = &cond, - .payload = payload, + .payload = payload != NULL ? strdupz(payload) : NULL, + .virtual = true, }; uuid_t uuid; @@ -2011,98 +2155,188 @@ dyncfg_config_t call_virtual_function_blocking(PARSER *parser, const char *name, return cfg; } -static dyncfg_config_t get_plugin_config_cb(void *usr_ctx) +#define CVF_MAX_LEN (1024) +static dyncfg_config_t get_plugin_config_cb(void *usr_ctx, const char *plugin_name) { PARSER *parser = usr_ctx; - return call_virtual_function_blocking(parser, "get_plugin_config", NULL, NULL); + + if (SERVING_STREAMING(parser)) { + char buf[CVF_MAX_LEN + 1]; + snprintfz(buf, CVF_MAX_LEN, FUNCTION_NAME_GET_PLUGIN_CONFIG " %s", plugin_name); + return call_virtual_function_blocking(parser, buf, NULL, NULL); + } + + return call_virtual_function_blocking(parser, FUNCTION_NAME_GET_PLUGIN_CONFIG, NULL, NULL); } -static dyncfg_config_t get_plugin_config_schema_cb(void *usr_ctx) +static dyncfg_config_t get_plugin_config_schema_cb(void *usr_ctx, const char *plugin_name) { PARSER *parser = usr_ctx; + + if (SERVING_STREAMING(parser)) { + char buf[CVF_MAX_LEN + 1]; + snprintfz(buf, CVF_MAX_LEN, FUNCTION_NAME_GET_PLUGIN_CONFIG_SCHEMA " %s", plugin_name); + return call_virtual_function_blocking(parser, buf, NULL, NULL); + } + return call_virtual_function_blocking(parser, "get_plugin_config_schema", NULL, NULL); } -static dyncfg_config_t get_module_config_cb(void *usr_ctx, const char *module_name) +static dyncfg_config_t get_module_config_cb(void *usr_ctx, const char *plugin_name, const char *module_name) { PARSER *parser = usr_ctx; - char buf[1024]; - snprintfz(buf, sizeof(buf), "get_module_config %s", module_name); - return call_virtual_function_blocking(parser, buf, NULL, NULL); + BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL); + + buffer_strcat(wb, FUNCTION_NAME_GET_MODULE_CONFIG); + if (SERVING_STREAMING(parser)) + buffer_sprintf(wb, " %s", plugin_name); + + buffer_sprintf(wb, " %s", module_name); + + dyncfg_config_t ret = call_virtual_function_blocking(parser, buffer_tostring(wb), NULL, NULL); + + buffer_free(wb); + + return ret; } -static dyncfg_config_t get_module_config_schema_cb(void *usr_ctx, const char *module_name) +static dyncfg_config_t get_module_config_schema_cb(void *usr_ctx, const char *plugin_name, const char *module_name) { PARSER *parser = usr_ctx; - char buf[1024]; - snprintfz(buf, sizeof(buf), "get_module_config_schema %s", module_name); - return call_virtual_function_blocking(parser, buf, NULL, NULL); + BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL); + + buffer_strcat(wb, FUNCTION_NAME_GET_MODULE_CONFIG_SCHEMA); + if (SERVING_STREAMING(parser)) + buffer_sprintf(wb, " %s", plugin_name); + + buffer_sprintf(wb, " %s", module_name); + + dyncfg_config_t ret = call_virtual_function_blocking(parser, buffer_tostring(wb), NULL, NULL); + + buffer_free(wb); + + return ret; } -static dyncfg_config_t get_job_config_schema_cb(void *usr_ctx, const char *module_name) +static dyncfg_config_t get_job_config_schema_cb(void *usr_ctx, const char *plugin_name, const char *module_name) { PARSER *parser = usr_ctx; - char buf[1024]; - snprintfz(buf, sizeof(buf), "get_job_config_schema %s", module_name); - return call_virtual_function_blocking(parser, buf, NULL, NULL); + BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL); + + buffer_strcat(wb, FUNCTION_NAME_GET_JOB_CONFIG_SCHEMA); + + if (SERVING_STREAMING(parser)) + buffer_sprintf(wb, " %s", plugin_name); + + buffer_sprintf(wb, " %s", module_name); + + dyncfg_config_t ret = call_virtual_function_blocking(parser, buffer_tostring(wb), NULL, NULL); + + buffer_free(wb); + + return ret; } -static dyncfg_config_t get_job_config_cb(void *usr_ctx, const char *module_name, const char* job_name) +static dyncfg_config_t get_job_config_cb(void *usr_ctx, const char *plugin_name, const char *module_name, const char* job_name) { PARSER *parser = usr_ctx; - char buf[1024]; - snprintfz(buf, sizeof(buf), "get_job_config %s %s", module_name, job_name); - return call_virtual_function_blocking(parser, buf, NULL, NULL); + BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL); + + buffer_strcat(wb, FUNCTION_NAME_GET_JOB_CONFIG); + + if (SERVING_STREAMING(parser)) + buffer_sprintf(wb, " %s", plugin_name); + + buffer_sprintf(wb, " %s %s", module_name, job_name); + + dyncfg_config_t ret = call_virtual_function_blocking(parser, buffer_tostring(wb), NULL, NULL); + + buffer_free(wb); + + return ret; } -enum set_config_result set_plugin_config_cb(void *usr_ctx, dyncfg_config_t *cfg) +enum set_config_result set_plugin_config_cb(void *usr_ctx, const char *plugin_name, dyncfg_config_t *cfg) { PARSER *parser = usr_ctx; + BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL); + + buffer_strcat(wb, FUNCTION_NAME_SET_PLUGIN_CONFIG); + + if (SERVING_STREAMING(parser)) + buffer_sprintf(wb, " %s", plugin_name); + int rc; - call_virtual_function_blocking(parser, "set_plugin_config", &rc, cfg->data); - if(rc != 1) + call_virtual_function_blocking(parser, buffer_tostring(wb), &rc, cfg->data); + + buffer_free(wb); + if(rc != DYNCFG_VFNC_RET_CFG_ACCEPTED) return SET_CONFIG_REJECTED; return SET_CONFIG_ACCEPTED; } -enum set_config_result set_module_config_cb(void *usr_ctx, const char *module_name, dyncfg_config_t *cfg) +enum set_config_result set_module_config_cb(void *usr_ctx, const char *plugin_name, const char *module_name, dyncfg_config_t *cfg) { PARSER *parser = usr_ctx; + BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL); + + buffer_strcat(wb, FUNCTION_NAME_SET_MODULE_CONFIG); + + if (SERVING_STREAMING(parser)) + buffer_sprintf(wb, " %s", plugin_name); + + buffer_sprintf(wb, " %s", module_name); + int rc; + call_virtual_function_blocking(parser, buffer_tostring(wb), &rc, cfg->data); - char buf[1024]; - snprintfz(buf, sizeof(buf), "set_module_config %s", module_name); - call_virtual_function_blocking(parser, buf, &rc, cfg->data); + buffer_free(wb); - if(rc != 1) + if(rc != DYNCFG_VFNC_RET_CFG_ACCEPTED) return SET_CONFIG_REJECTED; return SET_CONFIG_ACCEPTED; } -enum set_config_result set_job_config_cb(void *usr_ctx, const char *module_name, const char *job_name, dyncfg_config_t *cfg) +enum set_config_result set_job_config_cb(void *usr_ctx, const char *plugin_name, const char *module_name, const char *job_name, dyncfg_config_t *cfg) { PARSER *parser = usr_ctx; + BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL); + + buffer_strcat(wb, FUNCTION_NAME_SET_JOB_CONFIG); + + if (SERVING_STREAMING(parser)) + buffer_sprintf(wb, " %s", plugin_name); + + buffer_sprintf(wb, " %s %s", module_name, job_name); + int rc; + call_virtual_function_blocking(parser, buffer_tostring(wb), &rc, cfg->data); - char buf[1024]; - snprintfz(buf, sizeof(buf), "set_job_config %s %s", module_name, job_name); - call_virtual_function_blocking(parser, buf, &rc, cfg->data); + buffer_free(wb); - if(rc != 1) + if(rc != DYNCFG_VFNC_RET_CFG_ACCEPTED) return SET_CONFIG_REJECTED; return SET_CONFIG_ACCEPTED; } -enum set_config_result delete_job_cb(void *usr_ctx, const char *module_name, const char *job_name) +enum set_config_result delete_job_cb(void *usr_ctx, const char *plugin_name ,const char *module_name, const char *job_name) { PARSER *parser = usr_ctx; + BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL); + + buffer_strcat(wb, FUNCTION_NAME_DELETE_JOB); + + if (SERVING_STREAMING(parser)) + buffer_sprintf(wb, " %s", plugin_name); + + buffer_sprintf(wb, " %s %s", module_name, job_name); + int rc; + call_virtual_function_blocking(parser, buffer_tostring(wb), &rc, NULL); - char buf[1024]; - snprintfz(buf, sizeof(buf), "delete_job %s %s", module_name, job_name); - call_virtual_function_blocking(parser, buf, &rc, NULL); + buffer_free(wb); - if(rc != 1) + if(rc != DYNCFG_VFNC_RET_CFG_ACCEPTED) return SET_CONFIG_REJECTED; return SET_CONFIG_ACCEPTED; } @@ -2122,37 +2356,65 @@ static inline PARSER_RC pluginsd_register_plugin(char **words __maybe_unused, si cfg->get_config_schema_cb = get_plugin_config_schema_cb; cfg->cb_usr_ctx = parser; - parser->user.cd->cfg_dict_item = register_plugin(cfg); - - if (unlikely(parser->user.cd->cfg_dict_item == NULL)) { + const DICTIONARY_ITEM *di = register_plugin(parser->user.host->configurable_plugins, cfg, SERVING_PLUGINSD(parser)); + if (unlikely(di == NULL)) { freez(cfg->name); freez(cfg); return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_ENABLE, "error registering plugin"); } - parser->user.cd->configuration = cfg; + if (SERVING_PLUGINSD(parser)) { + // this is optimization for pluginsd to avoid extra dictionary lookup + // as we know which plugin is comunicating with us + parser->user.cd->cfg_dict_item = di; + parser->user.cd->configuration = cfg; + } else { + // register_plugin keeps the item acquired, so we need to release it + dictionary_acquired_item_release(parser->user.host->configurable_plugins, di); + } + + rrdpush_send_dyncfg_enable(parser->user.host, cfg->name); + return PARSER_RC_OK; } +#define LOG_MSG_SIZE (1024) +#define MODULE_NAME_IDX (SERVING_PLUGINSD(parser) ? 1 : 2) +#define MODULE_TYPE_IDX (SERVING_PLUGINSD(parser) ? 2 : 3) static inline PARSER_RC pluginsd_register_module(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) { netdata_log_info("PLUGINSD: DYNCFG_REG_MODULE"); - struct configurable_plugin *plug_cfg = parser->user.cd->configuration; - if (unlikely(plug_cfg == NULL)) - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "you have to enable dynamic configuration first using " PLUGINSD_KEYWORD_DYNCFG_ENABLE); + size_t expected_num_words = SERVING_PLUGINSD(parser) ? 3 : 4; - if (unlikely(num_words != 3)) - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "expected 2 parameters module_name followed by module_type"); + if (unlikely(num_words != expected_num_words)) { + char log[LOG_MSG_SIZE + 1]; + snprintfz(log, LOG_MSG_SIZE, "expected %zu (got %zu) parameters: %smodule_name module_type", expected_num_words - 1, num_words - 1, SERVING_PLUGINSD(parser) ? "" : "plugin_name "); + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, log); + } + + struct configurable_plugin *plug_cfg; + const DICTIONARY_ITEM *di = NULL; + if (SERVING_PLUGINSD(parser)) { + plug_cfg = parser->user.cd->configuration; + if (unlikely(plug_cfg == NULL)) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "you have to enable dynamic configuration first using " PLUGINSD_KEYWORD_DYNCFG_ENABLE); + } else { + di = dictionary_get_and_acquire_item(parser->user.host->configurable_plugins, words[1]); + if (unlikely(di == NULL)) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "plugin not found"); + + plug_cfg = (struct configurable_plugin *)dictionary_acquired_item_value(di); + } struct module *mod = callocz(1, sizeof(struct module)); - mod->type = str2_module_type(words[2]); + mod->type = str2_module_type(words[MODULE_TYPE_IDX]); if (unlikely(mod->type == MOD_TYPE_UNKNOWN)) { freez(mod); return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "unknown module type (allowed: job_array, single)"); } - mod->name = strdupz(words[1]); + mod->name = strdupz(words[MODULE_NAME_IDX]); mod->set_config_cb = set_module_config_cb; mod->get_config_cb = get_module_config_cb; @@ -2165,27 +2427,111 @@ static inline PARSER_RC pluginsd_register_module(char **words __maybe_unused, si mod->delete_job_cb = delete_job_cb; mod->job_config_cb_usr_ctx = parser; - register_module(plug_cfg, mod); + register_module(parser->user.host->configurable_plugins, plug_cfg, mod, SERVING_PLUGINSD(parser)); + + if (di != NULL) + dictionary_acquired_item_release(parser->user.host->configurable_plugins, di); + + rrdpush_send_dyncfg_reg_module(parser->user.host, plug_cfg->name, mod->name, mod->type); + return PARSER_RC_OK; } -// job_status <module_name> <job_name> <status_code> <state> <message> -static inline PARSER_RC pluginsd_job_status(char **words, size_t num_words, PARSER *parser) -{ - if (unlikely(num_words != 6 && num_words != 5)) - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_REPORT_JOB_STATUS, "expected 4 or 5 parameters: module_name, job_name, status_code, state, [optional: message]"); +static inline PARSER_RC pluginsd_register_job_common(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused, const char *plugin_name) { + if (atol(words[3]) < 0) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "invalid flags"); + dyncfg_job_flg_t flags = atol(words[3]); + if (SERVING_PLUGINSD(parser)) + flags |= JOB_FLG_PLUGIN_PUSHED; + else + flags |= JOB_FLG_STREAMING_PUSHED; + + enum job_type job_type = str2job_type(words[2]); + if (job_type == JOB_TYPE_UNKNOWN) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "unknown job type"); + if (SERVING_PLUGINSD(parser) && job_type == JOB_TYPE_USER) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "plugins cannot push jobs of type \"user\" (this is allowed only in streaming)"); + + if (register_job(parser->user.host->configurable_plugins, plugin_name, words[0], words[1], job_type, flags, 0)) // ignore existing is off as this is explicitly called register job + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "error registering job"); + + rrdpush_send_dyncfg_reg_job(parser->user.host, plugin_name, words[0], words[1], job_type, flags); + return PARSER_RC_OK; +} + +static inline PARSER_RC pluginsd_register_job(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) { + size_t expected_num_words = SERVING_PLUGINSD(parser) ? 5 : 6; + + if (unlikely(num_words != expected_num_words)) { + char log[LOG_MSG_SIZE + 1]; + snprintfz(log, LOG_MSG_SIZE, "expected %zu (got %zu) parameters: %smodule_name job_name job_type", expected_num_words - 1, num_words - 1, SERVING_PLUGINSD(parser) ? "" : "plugin_name "); + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, log); + } - int state = atoi(words[4]); + if (SERVING_PLUGINSD(parser)) { + return pluginsd_register_job_common(&words[1], num_words - 1, parser, parser->user.cd->configuration->name); + } + return pluginsd_register_job_common(&words[2], num_words - 2, parser, words[1]); +} - enum job_status job_status = str2job_state(words[3]); - if (unlikely(job_status == JOB_STATUS_UNKNOWN)) - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_REPORT_JOB_STATUS, "unknown job state"); +static inline PARSER_RC pluginsd_job_status_common(char **words, size_t num_words, PARSER *parser, const char *plugin_name) { + int state = str2i(words[3]); + + enum job_status status = str2job_state(words[2]); + if (unlikely(SERVING_PLUGINSD(parser) && status == JOB_STATUS_UNKNOWN)) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_REPORT_JOB_STATUS, "unknown job status"); char *message = NULL; - if (num_words == 6) - message = strdupz(words[5]); + if (num_words == 5) + message = words[4]; + + const DICTIONARY_ITEM *plugin_item; + DICTIONARY *job_dict; + const DICTIONARY_ITEM *job_item = report_job_status_acq_lock(parser->user.host->configurable_plugins, &plugin_item, &job_dict, plugin_name, words[0], words[1], status, state, message); + + if (job_item != NULL) { + struct job *job = dictionary_acquired_item_value(job_item); + rrdpush_send_job_status_update(parser->user.host, plugin_name, words[0], job); + + pthread_mutex_unlock(&job->lock); + dictionary_acquired_item_release(job_dict, job_item); + dictionary_acquired_item_release(parser->user.host->configurable_plugins, plugin_item); + } - report_job_status(parser->user.cd->configuration, words[1], words[2], job_status, state, message); + return PARSER_RC_OK; +} + +// job_status [plugin_name if streaming] <module_name> <job_name> <status_code> <state> [message] +static PARSER_RC pluginsd_job_status(char **words, size_t num_words, PARSER *parser) { + if (SERVING_PLUGINSD(parser)) { + if (unlikely(num_words != 5 && num_words != 6)) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_REPORT_JOB_STATUS, "expected 4 or 5 parameters: module_name, job_name, status_code, state, [optional: message]"); + } else { + if (unlikely(num_words != 6 && num_words != 7)) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_REPORT_JOB_STATUS, "expected 5 or 6 parameters: plugin_name, module_name, job_name, status_code, state, [optional: message]"); + } + + if (SERVING_PLUGINSD(parser)) { + return pluginsd_job_status_common(&words[1], num_words - 1, parser, parser->user.cd->configuration->name); + } + return pluginsd_job_status_common(&words[2], num_words - 2, parser, words[1]); +} + +static PARSER_RC pluginsd_delete_job(char **words, size_t num_words, PARSER *parser) { + // this can confuse a bit but there is a diference between KEYWORD_DELETE_JOB and actual delete_job function + // they are of opossite direction + if (num_words != 4) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DELETE_JOB, "expected 2 parameters: plugin_name, module_name, job_name"); + + const char *plugin_name = get_word(words, num_words, 1); + const char *module_name = get_word(words, num_words, 2); + const char *job_name = get_word(words, num_words, 3); + + if (SERVING_STREAMING(parser)) + delete_job_pname(parser->user.host->configurable_plugins, plugin_name, module_name, job_name); + + // forward to parent if any + rrdpush_send_job_deleted(parser->user.host, plugin_name, module_name, job_name); return PARSER_RC_OK; } @@ -2506,8 +2852,14 @@ PARSER_RC parser_execute(PARSER *parser, PARSER_KEYWORD *keyword, char **words, case 102: return pluginsd_register_module(words, num_words, parser); + case 103: + return pluginsd_register_job(words, num_words, parser); + case 110: return pluginsd_job_status(words, num_words, parser); + + case 111: + return pluginsd_delete_job(words, num_words, parser); default: fatal("Unknown keyword '%s' with id %zu", keyword->keyword, keyword->id); @@ -2525,14 +2877,20 @@ void parser_init_repertoire(PARSER *parser, PARSER_REPERTOIRE repertoire) { } } +static void parser_destroy_dyncfg(PARSER *parser) { + if (parser->user.cd != NULL && parser->user.cd->configuration != NULL) { + unregister_plugin(parser->user.host->configurable_plugins, parser->user.cd->cfg_dict_item); + parser->user.cd->configuration = NULL; + } else if (parser->user.host != NULL && SERVING_STREAMING(parser) && parser->user.host != localhost){ + dictionary_flush(parser->user.host->configurable_plugins); + } +} + void parser_destroy(PARSER *parser) { if (unlikely(!parser)) return; - if (parser->user.cd != NULL && parser->user.cd->configuration != NULL) { - unregister_plugin(parser->user.cd->cfg_dict_item); - parser->user.cd->configuration = NULL; - } + parser_destroy_dyncfg(parser); dictionary_destroy(parser->inflight.functions); freez(parser); |