diff options
author | Timotej S <6674623+underhood@users.noreply.github.com> | 2023-09-29 17:13:42 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-09-29 17:13:42 +0200 |
commit | 6dfc99a2e0cf893c9ac2fd56d7986982738a9579 (patch) | |
tree | d5fb21199d99cc681641d7f6f8f33a7f083f435d /collectors/plugins.d | |
parent | dd17442f81ca4bbb97c5b93c7a5525ce4946fbb1 (diff) |
Dyncfg add streaming support (#15791)
* dyncfg fncnames as constants
* add helper macros to know parser streaming/plugin
* plugins dictionary per RRDHOST
* api_request_v2_config add support for /host/
* streamify pluginsd_register_plugin
* streamify pluginsd_register_module
* streamify report_job_status
* streamify dyncfg get functions
* module_type2str
* add job type and flags
* add DYNCFG_REGISTER_JOB
* implement register job
* push all to parent at startup
* add helper function is_dyncfg_function
* forward virtual functions trough streaming
* separate job2json
* add api/v2/job_statuses
* do cleanup on streaming
* streamify set functions
* support FUNCTION_PAYLOAD trough streaming
* WIP tests
* dont attempt loading non-localhost configs
* move cfg persistence to proper place
* prevent race
* properly update job state at runtime
* cleanup 1
* job2json add missing reason
* add tests
* correct HTTP code
* add test
* streamify delete_job_cb
* add DELETE_JOB keyword
* job delete over streaming
* add tests for create and delete job over parent
* rrdpush common checks to macro
* add missing forwarders
* fix jobs according to test results
* more tests
* review comment 1
* codacy remove valid warning
* codacy ruby fixes
* fix wrong rc check
* minimal test plugin for child
* add test
* dict walk insted of master lock
* minor - english spelling fixes
* thiago comments 1
* minor - rename folder to dynconf
* enable only when built with -DNETDATA_TEST_DYNCFG
* minor - compiler warning
* create dir post daemonization
* stricter URL check
Diffstat (limited to 'collectors/plugins.d')
-rw-r--r-- | collectors/plugins.d/gperf-config.txt | 24 | ||||
-rw-r--r-- | collectors/plugins.d/gperf-hashtable.h | 146 | ||||
-rw-r--r-- | collectors/plugins.d/plugins_d.h | 10 | ||||
-rw-r--r-- | collectors/plugins.d/pluginsd_parser.c | 494 |
4 files changed, 523 insertions, 151 deletions
diff --git a/collectors/plugins.d/gperf-config.txt b/collectors/plugins.d/gperf-config.txt index b8140e66c5..a1d0c51ba8 100644 --- a/collectors/plugins.d/gperf-config.txt +++ b/collectors/plugins.d/gperf-config.txt @@ -36,20 +36,22 @@ SET, 11, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_ VARIABLE, 53, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 19 DYNCFG_ENABLE, 101, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 20 DYNCFG_REGISTER_MODULE, 102, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 21 -REPORT_JOB_STATUS, 110, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 22 +DYNCFG_REGISTER_JOB, 103, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 22 +REPORT_JOB_STATUS, 110, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 23 +DELETE_JOB, 111, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 24 # # Streaming only keywords # -CLAIMED_ID, 61, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 23 -BEGIN2, 2, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 24 -SET2, 1, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 25 -END2, 3, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 26 +CLAIMED_ID, 61, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 25 +BEGIN2, 2, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 26 +SET2, 1, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 27 +END2, 3, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 28 # # Streaming Replication keywords # -CHART_DEFINITION_END, 33, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 27 -RBEGIN, 22, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 28 -RDSTATE, 23, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 29 -REND, 25, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 30 -RSET, 21, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 31 -RSSTATE, 24, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 32 +CHART_DEFINITION_END, 33, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 29 +RBEGIN, 22, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 30 +RDSTATE, 23, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 31 +REND, 25, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 32 +RSET, 21, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 33 +RSSTATE, 24, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 34 diff --git a/collectors/plugins.d/gperf-hashtable.h b/collectors/plugins.d/gperf-hashtable.h index e7d20126f6..5bbf9fa984 100644 --- a/collectors/plugins.d/gperf-hashtable.h +++ b/collectors/plugins.d/gperf-hashtable.h @@ -30,12 +30,12 @@ #endif -#define GPERF_PARSER_TOTAL_KEYWORDS 32 +#define GPERF_PARSER_TOTAL_KEYWORDS 34 #define GPERF_PARSER_MIN_WORD_LENGTH 3 #define GPERF_PARSER_MAX_WORD_LENGTH 22 #define GPERF_PARSER_MIN_HASH_VALUE 3 -#define GPERF_PARSER_MAX_HASH_VALUE 41 -/* maximum key range = 39, duplicates = 0 */ +#define GPERF_PARSER_MAX_HASH_VALUE 36 +/* maximum key range = 34, duplicates = 0 */ #ifdef __GNUC__ __inline @@ -49,32 +49,32 @@ gperf_keyword_hash_function (register const char *str, register size_t len) { static unsigned char asso_values[] = { - 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, 42, 42, 42, 42, 16, 7, 2, 11, 0, - 8, 42, 3, 9, 42, 42, 9, 42, 0, 2, - 42, 42, 1, 3, 42, 7, 17, 42, 27, 2, - 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, - 42, 42, 42, 42, 42, 42 + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 12, 28, 5, 2, 0, + 0, 37, 3, 13, 37, 37, 14, 37, 0, 2, + 37, 37, 1, 3, 37, 6, 10, 37, 32, 2, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37, 37, 37, 37, 37, + 37, 37, 37, 37, 37, 37 }; return len + asso_values[(unsigned char)str[1]] + asso_values[(unsigned char)str[0]]; } @@ -84,70 +84,72 @@ static PARSER_KEYWORD gperf_keywords[] = {(char*)0}, {(char*)0}, {(char*)0}, #line 30 "gperf-config.txt" {"END", 13, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 13}, -#line 46 "gperf-config.txt" - {"END2", 3, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 26}, -#line 53 "gperf-config.txt" - {"REND", 25, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 30}, +#line 48 "gperf-config.txt" + {"END2", 3, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 28}, +#line 55 "gperf-config.txt" + {"REND", 25, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 32}, #line 35 "gperf-config.txt" {"SET", 11, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 18}, -#line 45 "gperf-config.txt" - {"SET2", 1, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 25}, -#line 54 "gperf-config.txt" - {"RSET", 21, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 31}, +#line 47 "gperf-config.txt" + {"SET2", 1, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 27}, +#line 56 "gperf-config.txt" + {"RSET", 21, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 33}, #line 18 "gperf-config.txt" {"HOST", 71, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 4}, +#line 54 "gperf-config.txt" + {"RDSTATE", 23, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 31}, +#line 57 "gperf-config.txt" + {"RSSTATE", 24, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 34}, +#line 41 "gperf-config.txt" + {"DELETE_JOB", 111, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 24}, #line 26 "gperf-config.txt" {"CHART", 32, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 9}, -#line 55 "gperf-config.txt" - {"RSSTATE", 24, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 32}, -#line 25 "gperf-config.txt" - {"BEGIN", 12, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 8}, -#line 44 "gperf-config.txt" - {"BEGIN2", 2, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 24}, -#line 51 "gperf-config.txt" - {"RBEGIN", 22, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 28}, +#line 31 "gperf-config.txt" + {"FUNCTION", 41, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 14}, #line 21 "gperf-config.txt" {"HOST_LABEL", 74, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 7}, #line 19 "gperf-config.txt" {"HOST_DEFINE", 72, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 5}, -#line 27 "gperf-config.txt" - {"CLABEL", 34, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 10}, -#line 39 "gperf-config.txt" - {"REPORT_JOB_STATUS", 110, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 22}, -#line 52 "gperf-config.txt" - {"RDSTATE", 23, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 29}, -#line 20 "gperf-config.txt" - {"HOST_DEFINE_END", 73, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 6}, -#line 43 "gperf-config.txt" - {"CLAIMED_ID", 61, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 23}, -#line 15 "gperf-config.txt" - {"FLUSH", 97, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 1}, -#line 31 "gperf-config.txt" - {"FUNCTION", 41, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 14}, -#line 28 "gperf-config.txt" - {"CLABEL_COMMIT", 35, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 11}, -#line 50 "gperf-config.txt" - {"CHART_DEFINITION_END", 33, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 27}, #line 37 "gperf-config.txt" {"DYNCFG_ENABLE", 101, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 20}, -#line 16 "gperf-config.txt" - {"DISABLE", 98, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 2}, +#line 40 "gperf-config.txt" + {"REPORT_JOB_STATUS", 110, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 23}, +#line 15 "gperf-config.txt" + {"FLUSH", 97, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 1}, +#line 20 "gperf-config.txt" + {"HOST_DEFINE_END", 73, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 6}, #line 34 "gperf-config.txt" {"OVERWRITE", 52, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 17}, +#line 16 "gperf-config.txt" + {"DISABLE", 98, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 2}, +#line 39 "gperf-config.txt" + {"DYNCFG_REGISTER_JOB", 103, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 22}, #line 29 "gperf-config.txt" {"DIMENSION", 31, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 12}, -#line 33 "gperf-config.txt" - {"LABEL", 51, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 16}, -#line 17 "gperf-config.txt" - {"EXIT", 99, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 3}, - {(char*)0}, {(char*)0}, {(char*)0}, +#line 27 "gperf-config.txt" + {"CLABEL", 34, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 10}, #line 38 "gperf-config.txt" {"DYNCFG_REGISTER_MODULE", 102, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 21}, #line 32 "gperf-config.txt" {"FUNCTION_RESULT_BEGIN", 42, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 15}, - {(char*)0}, {(char*)0}, {(char*)0}, {(char*)0}, +#line 52 "gperf-config.txt" + {"CHART_DEFINITION_END", 33, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 29}, +#line 45 "gperf-config.txt" + {"CLAIMED_ID", 61, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 25}, #line 36 "gperf-config.txt" - {"VARIABLE", 53, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 19} + {"VARIABLE", 53, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 19}, +#line 33 "gperf-config.txt" + {"LABEL", 51, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 16}, +#line 28 "gperf-config.txt" + {"CLABEL_COMMIT", 35, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 11}, +#line 25 "gperf-config.txt" + {"BEGIN", 12, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 8}, +#line 46 "gperf-config.txt" + {"BEGIN2", 2, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 26}, +#line 53 "gperf-config.txt" + {"RBEGIN", 22, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 30}, +#line 17 "gperf-config.txt" + {"EXIT", 99, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 3} }; PARSER_KEYWORD * diff --git a/collectors/plugins.d/plugins_d.h b/collectors/plugins.d/plugins_d.h index b22b037ba6..7c5df4168e 100644 --- a/collectors/plugins.d/plugins_d.h +++ b/collectors/plugins.d/plugins_d.h @@ -10,6 +10,16 @@ #define PLUGINSD_CMD_MAX (FILENAME_MAX*2) #define PLUGINSD_STOCK_PLUGINS_DIRECTORY_PATH 0 +#define PLUGINSD_KEYWORD_FUNCTION_PAYLOAD "FUNCTION_PAYLOAD" +#define PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END "FUNCTION_PAYLOAD_END" + +#define PLUGINSD_KEYWORD_DYNCFG_ENABLE "DYNCFG_ENABLE" +#define PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE "DYNCFG_REGISTER_MODULE" +#define PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB "DYNCFG_REGISTER_JOB" + +#define PLUGINSD_KEYWORD_REPORT_JOB_STATUS "REPORT_JOB_STATUS" +#define PLUGINSD_KEYWORD_DELETE_JOB "DELETE_JOB" + #define PLUGINSD_LINE_MAX_SSL_READ 512 #define PLUGINSD_MAX_WORDS 20 diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c index fbcd812627..d568db5ca2 100644 --- a/collectors/plugins.d/pluginsd_parser.c +++ b/collectors/plugins.d/pluginsd_parser.c @@ -4,6 +4,9 @@ #define LOG_FUNCTIONS false +#define SERVING_STREAMING(parser) (parser->repertoire == PARSER_INIT_STREAMING) +#define SERVING_PLUGINSD(parser) (parser->repertoire == PARSER_INIT_PLUGINSD) + static ssize_t send_to_plugin(const char *txt, void *data) { PARSER *parser = data; @@ -742,6 +745,7 @@ struct inflight_function { usec_t sent_ut; const char *payload; PARSER *parser; + bool virtual; }; static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void *func, void *parser_ptr) { @@ -808,16 +812,83 @@ static bool inflight_functions_conflict_callback(const DICTIONARY_ITEM *item __m return false; } -static void inflight_functions_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func, void *parser_ptr __maybe_unused) { +void delete_job_finalize(struct parser *parser, struct configurable_plugin *plug, const char *fnc_sig, int code) { + if (code != DYNCFG_VFNC_RET_CFG_ACCEPTED) + return; + + char *params_local = strdupz(fnc_sig); + char *words[DYNCFG_MAX_WORDS]; + size_t words_c = quoted_strings_splitter(params_local, words, DYNCFG_MAX_WORDS, isspace_map_pluginsd); + + if (words_c != 3) { + netdata_log_error("PLUGINSD_PARSER: invalid number of parameters for delete_job"); + freez(params_local); + return; + } + + const char *module = words[1]; + const char *job = words[2]; + + delete_job(plug, module, job); + + unlink_job(plug->name, module, job); + + rrdpush_send_job_deleted(localhost, plug->name, module, job); + + freez(params_local); +} + +void set_job_finalize(struct parser *parser, struct configurable_plugin *plug, const char *fnc_sig, int code) { + if (code != DYNCFG_VFNC_RET_CFG_ACCEPTED) + return; + + char *params_local = strdupz(fnc_sig); + char *words[DYNCFG_MAX_WORDS]; + size_t words_c = quoted_strings_splitter(params_local, words, DYNCFG_MAX_WORDS, isspace_map_pluginsd); + + if (words_c != 3) { + netdata_log_error("PLUGINSD_PARSER: invalid number of parameters for set_job_config"); + freez(params_local); + return; + } + + const char *module_name = get_word(words, words_c, 1); + const char *job_name = get_word(words, words_c, 2); + + if (register_job(parser->user.host->configurable_plugins, parser->user.cd->configuration->name, module_name, job_name, JOB_TYPE_USER, JOB_FLG_USER_CREATED, 1)) { + freez(params_local); + return; + } + + // only send this if it is not existing already (register_job cares for that) + rrdpush_send_dyncfg_reg_job(localhost, parser->user.cd->configuration->name, module_name, job_name, JOB_TYPE_USER, JOB_FLG_USER_CREATED); + + freez(params_local); +} + +static void inflight_functions_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func, void *parser_ptr) { struct inflight_function *pf = func; + struct parser *parser = (struct parser *)parser_ptr; internal_error(LOG_FUNCTIONS, "FUNCTION '%s' result of transaction '%s' received from collector (%zu bytes, request %"PRIu64" usec, response %"PRIu64" usec)", string2str(pf->function), dictionary_acquired_item_name(item), buffer_strlen(pf->result_body_wb), pf->sent_ut - pf->started_ut, now_realtime_usec() - pf->sent_ut); + if (pf->virtual && SERVING_PLUGINSD(parser)) { + if (pf->payload) { + if (strncmp(string2str(pf->function), FUNCTION_NAME_SET_JOB_CONFIG, strlen(FUNCTION_NAME_SET_JOB_CONFIG)) == 0) + set_job_finalize(parser, parser->user.cd->configuration, string2str(pf->function), pf->code); + dyn_conf_store_config(string2str(pf->function), pf->payload, parser->user.cd->configuration); + } else if (strncmp(string2str(pf->function), FUNCTION_NAME_DELETE_JOB, strlen(FUNCTION_NAME_DELETE_JOB)) == 0) { + delete_job_finalize(parser, parser->user.cd->configuration, string2str(pf->function), pf->code); + } + } + pf->result_cb(pf->result_body_wb, pf->code, pf->result_cb_data); + string_freez(pf->function); + freez(pf->payload); } void inflight_functions_init(PARSER *parser) { @@ -1947,9 +2018,81 @@ static void virt_fnc_got_data_cb(BUFFER *wb __maybe_unused, int code, void *call } #define VIRT_FNC_TIMEOUT 1 +#define VIRT_FNC_BUF_SIZE (4096) +void call_virtual_function_async(BUFFER *wb, RRDHOST *host, const char *name, const char *payload, rrd_function_result_callback_t callback, void *callback_data) { + PARSER *parser = NULL; + + //TODO simplify (as we really need only first parameter to get plugin name maybe we can avoid parsing all) + char *words[PLUGINSD_MAX_WORDS]; + char *function_with_params = strdupz(name); + size_t num_words = quoted_strings_splitter(function_with_params, words, PLUGINSD_MAX_WORDS, isspace_map_pluginsd); + + if (num_words < 2) { + netdata_log_error("PLUGINSD: virtual function name is empty."); + freez(function_with_params); + return; + } + + const DICTIONARY_ITEM *cpi = dictionary_get_and_acquire_item(host->configurable_plugins, get_word(words, num_words, 1)); + if (unlikely(cpi == NULL)) { + netdata_log_error("PLUGINSD: virtual function plugin '%s' not found.", name); + freez(function_with_params); + return; + } + struct configurable_plugin *cp = dictionary_acquired_item_value(cpi); + parser = (PARSER *)cp->cb_usr_ctx; + + BUFFER *function_out = buffer_create(VIRT_FNC_BUF_SIZE, NULL); + // if we are forwarding this to a plugin (as opposed to streaming/child) we have to remove the first parameter (plugin_name) + buffer_strcat(function_out, get_word(words, num_words, 0)); + for (size_t i = 1; i < num_words; i++) { + if (i == 1 && SERVING_PLUGINSD(parser)) + continue; + buffer_sprintf(function_out, " %s", get_word(words, num_words, i)); + } + freez(function_with_params); + + usec_t now = now_realtime_usec(); + + struct inflight_function tmp = { + .started_ut = now, + .timeout_ut = now + VIRT_FNC_TIMEOUT + USEC_PER_SEC, + .result_body_wb = wb, + .timeout = VIRT_FNC_TIMEOUT * 10, + .function = string_strdupz(buffer_tostring(function_out)), + .result_cb = callback, + .result_cb_data = callback_data, + .payload = payload != NULL ? strdupz(payload) : NULL, + .virtual = true, + }; + buffer_free(function_out); + + uuid_t uuid; + uuid_generate_time(uuid); + + char key[UUID_STR_LEN]; + uuid_unparse_lower(uuid, key); + + dictionary_write_lock(parser->inflight.functions); + + // if there is any error, our dictionary callbacks will call the caller callback to notify + // the caller about the error - no need for error handling here. + dictionary_set(parser->inflight.functions, key, &tmp, sizeof(struct inflight_function)); + + if(!parser->inflight.smaller_timeout || tmp.timeout_ut < parser->inflight.smaller_timeout) + parser->inflight.smaller_timeout = tmp.timeout_ut; + + // garbage collect stale inflight functions + if(parser->inflight.smaller_timeout < now) + inflight_functions_garbage_collect(parser, now); + + dictionary_write_unlock(parser->inflight.functions); +} + + dyncfg_config_t call_virtual_function_blocking(PARSER *parser, const char *name, int *rc, const char *payload) { usec_t now = now_realtime_usec(); - BUFFER *wb = buffer_create(4096, NULL); + BUFFER *wb = buffer_create(VIRT_FNC_BUF_SIZE, NULL); struct mutex_cond cond = { .lock = PTHREAD_MUTEX_INITIALIZER, @@ -1964,7 +2107,8 @@ dyncfg_config_t call_virtual_function_blocking(PARSER *parser, const char *name, .function = string_strdupz(name), .result_cb = virt_fnc_got_data_cb, .result_cb_data = &cond, - .payload = payload, + .payload = payload != NULL ? strdupz(payload) : NULL, + .virtual = true, }; uuid_t uuid; @@ -2011,98 +2155,188 @@ dyncfg_config_t call_virtual_function_blocking(PARSER *parser, const char *name, return cfg; } -static dyncfg_config_t get_plugin_config_cb(void *usr_ctx) +#define CVF_MAX_LEN (1024) +static dyncfg_config_t get_plugin_config_cb(void *usr_ctx, const char *plugin_name) { PARSER *parser = usr_ctx; - return call_virtual_function_blocking(parser, "get_plugin_config", NULL, NULL); + + if (SERVING_STREAMING(parser)) { + char buf[CVF_MAX_LEN + 1]; + snprintfz(buf, CVF_MAX_LEN, FUNCTION_NAME_GET_PLUGIN_CONFIG " %s", plugin_name); + return call_virtual_function_blocking(parser, buf, NULL, NULL); + } + + return call_virtual_function_blocking(parser, FUNCTION_NAME_GET_PLUGIN_CONFIG, NULL, NULL); } -static dyncfg_config_t get_plugin_config_schema_cb(void *usr_ctx) +static dyncfg_config_t get_plugin_config_schema_cb(void *usr_ctx, const char *plugin_name) { PARSER *parser = usr_ctx; + + if (SERVING_STREAMING(parser)) { + char buf[CVF_MAX_LEN + 1]; + snprintfz(buf, CVF_MAX_LEN, FUNCTION_NAME_GET_PLUGIN_CONFIG_SCHEMA " %s", plugin_name); + return call_virtual_function_blocking(parser, buf, NULL, NULL); + } + return call_virtual_function_blocking(parser, "get_plugin_config_schema", NULL, NULL); } -static dyncfg_config_t get_module_config_cb(void *usr_ctx, const char *module_name) +static dyncfg_config_t get_module_config_cb(void *usr_ctx, const char *plugin_name, const char *module_name) { PARSER *parser = usr_ctx; - char buf[1024]; - snprintfz(buf, sizeof(buf), "get_module_config %s", module_name); - return call_virtual_function_blocking(parser, buf, NULL, NULL); + BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL); + + buffer_strcat(wb, FUNCTION_NAME_GET_MODULE_CONFIG); + if (SERVING_STREAMING(parser)) + buffer_sprintf(wb, " %s", plugin_name); + + buffer_sprintf(wb, " %s", module_name); + + dyncfg_config_t ret = call_virtual_function_blocking(parser, buffer_tostring(wb), NULL, NULL); + + buffer_free(wb); + + return ret; } -static dyncfg_config_t get_module_config_schema_cb(void *usr_ctx, const char *module_name) +static dyncfg_config_t get_module_config_schema_cb(void *usr_ctx, const char *plugin_name, const char *module_name) { PARSER *parser = usr_ctx; - char buf[1024]; - snprintfz(buf, sizeof(buf), "get_module_config_schema %s", module_name); - return call_virtual_function_blocking(parser, buf, NULL, NULL); + BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL); + + buffer_strcat(wb, FUNCTION_NAME_GET_MODULE_CONFIG_SCHEMA); + if (SERVING_STREAMING(parser)) + buffer_sprintf(wb, " %s", plugin_name); + + buffer_sprintf(wb, " %s", module_name); + + dyncfg_config_t ret = call_virtual_function_blocking(parser, buffer_tostring(wb), NULL, NULL); + + buffer_free(wb); + + return ret; } -static dyncfg_config_t get_job_config_schema_cb(void *usr_ctx, const char *module_name) +static dyncfg_config_t get_job_config_schema_cb(void *usr_ctx, const char *plugin_name, const char *module_name) { PARSER *parser = usr_ctx; - char buf[1024]; - snprintfz(buf, sizeof(buf), "get_job_config_schema %s", module_name); - return call_virtual_function_blocking(parser, buf, NULL, NULL); + BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL); + + buffer_strcat(wb, FUNCTION_NAME_GET_JOB_CONFIG_SCHEMA); + + if (SERVING_STREAMING(parser)) + buffer_sprintf(wb, " %s", plugin_name); + + buffer_sprintf(wb, " %s", module_name); + + dyncfg_config_t ret = call_virtual_function_blocking(parser, buffer_tostring(wb), NULL, NULL); + + buffer_free(wb); + + return ret; } -static dyncfg_config_t get_job_config_cb(void *usr_ctx, const char *module_name, const char* job_name) +static dyncfg_config_t get_job_config_cb(void *usr_ctx, const char *plugin_name, const char *module_name, const char* job_name) { PARSER *parser = usr_ctx; - char buf[1024]; - snprintfz(buf, sizeof(buf), "get_job_config %s %s", module_name, job_name); - return call_virtual_function_blocking(parser, buf, NULL, NULL); + BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL); + + buffer_strcat(wb, FUNCTION_NAME_GET_JOB_CONFIG); + + if (SERVING_STREAMING(parser)) + buffer_sprintf(wb, " %s", plugin_name); + + buffer_sprintf(wb, " %s %s", module_name, job_name); + + dyncfg_config_t ret = call_virtual_function_blocking(parser, buffer_tostring(wb), NULL, NULL); + + buffer_free(wb); + + return ret; } -enum set_config_result set_plugin_config_cb(void *usr_ctx, dyncfg_config_t *cfg) +enum set_config_result set_plugin_config_cb(void *usr_ctx, const char *plugin_name, dyncfg_config_t *cfg) { PARSER *parser = usr_ctx; + BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL); + + buffer_strcat(wb, FUNCTION_NAME_SET_PLUGIN_CONFIG); + + if (SERVING_STREAMING(parser)) + buffer_sprintf(wb, " %s", plugin_name); + int rc; - call_virtual_function_blocking(parser, "set_plugin_config", &rc, cfg->data); - if(rc != 1) + call_virtual_function_blocking(parser, buffer_tostring(wb), &rc, cfg->data); + + buffer_free(wb); + if(rc != DYNCFG_VFNC_RET_CFG_ACCEPTED) return SET_CONFIG_REJECTED; return SET_CONFIG_ACCEPTED; } -enum set_config_result set_module_config_cb(void *usr_ctx, const char *module_name, dyncfg_config_t *cfg) +enum set_config_result set_module_config_cb(void *usr_ctx, const char *plugin_name, const char *module_name, dyncfg_config_t *cfg) { PARSER *parser = usr_ctx; + BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL); + + buffer_strcat(wb, FUNCTION_NAME_SET_MODULE_CONFIG); + + if (SERVING_STREAMING(parser)) + buffer_sprintf(wb, " %s", plugin_name); + + buffer_sprintf(wb, " %s", module_name); + int rc; + call_virtual_function_blocking(parser, buffer_tostring(wb), &rc, cfg->data); - char buf[1024]; - snprintfz(buf, sizeof(buf), "set_module_config %s", module_name); - call_virtual_function_blocking(parser, buf, &rc, cfg->data); + buffer_free(wb); - if(rc != 1) + if(rc != DYNCFG_VFNC_RET_CFG_ACCEPTED) return SET_CONFIG_REJECTED; return SET_CONFIG_ACCEPTED; } -enum set_config_result set_job_config_cb(void *usr_ctx, const char *module_name, const char *job_name, dyncfg_config_t *cfg) +enum set_config_result set_job_config_cb(void *usr_ctx, const char *plugin_name, const char *module_name, const char *job_name, dyncfg_config_t *cfg) { PARSER *parser = usr_ctx; + BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL); + + buffer_strcat(wb, FUNCTION_NAME_SET_JOB_CONFIG); + + if (SERVING_STREAMING(parser)) + buffer_sprintf(wb, " %s", plugin_name); + + buffer_sprintf(wb, " %s %s", module_name, job_name); + int rc; + call_virtual_function_blocking(parser, buffer_tostring(wb), &rc, cfg->data); - char buf[1024]; - snprintfz(buf, sizeof(buf), "set_job_config %s %s", module_name, job_name); - call_virtual_function_blocking(parser, buf, &rc, cfg->data); + buffer_free(wb); - if(rc != 1) + if(rc != DYNCFG_VFNC_RET_CFG_ACCEPTED) return SET_CONFIG_REJECTED; return SET_CONFIG_ACCEPTED; } -enum set_config_result delete_job_cb(void *usr_ctx, const char *module_name, const char *job_name) +enum set_config_result delete_job_cb(void *usr_ctx, const char *plugin_name ,const char *module_name, const char *job_name) { PARSER *parser = usr_ctx; + BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL); + + buffer_strcat(wb, FUNCTION_NAME_DELETE_JOB); + + if (SERVING_STREAMING(parser)) + buffer_sprintf(wb, " %s", plugin_name); + + buffer_sprintf(wb, " %s %s", module_name, job_name); + int rc; + call_virtual_function_blocking(parser, buffer_tostring(wb), &rc, NULL); - char buf[1024]; - snprintfz(buf, sizeof(buf), "delete_job %s %s", module_name, job_name); - call_virtual_function_blocking(parser, buf, &rc, NULL); + buffer_free(wb); - if(rc != 1) + if(rc != DYNCFG_VFNC_RET_CFG_ACCEPTED) return SET_CONFIG_REJECTED; return SET_CONFIG_ACCEPTED; } @@ -2122,37 +2356,65 @@ static inline PARSER_RC pluginsd_register_plugin(char **words __maybe_unused, si cfg->get_config_schema_cb = get_plugin_config_schema_cb; cfg->cb_usr_ctx = parser; - parser->user.cd->cfg_dict_item = register_plugin(cfg); - - if (unlikely(parser->user.cd->cfg_dict_item == NULL)) { + const DICTIONARY_ITEM *di = register_plugin(parser->user.host->configurable_plugins, cfg, SERVING_PLUGINSD(parser)); + if (unlikely(di == NULL)) { freez(cfg->name); freez(cfg); return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_ENABLE, "error registering plugin"); } - parser->user.cd->configuration = cfg; + if (SERVING_PLUGINSD(parser)) { + // this is optimization for pluginsd to avoid extra dictionary lookup + // as we know which plugin is comunicating with us + parser->user.cd->cfg_dict_item = di; + parser->user.cd->configuration = cfg; + } else { + // register_plugin keeps the item acquired, so we need to release it + dictionary_acquired_item_release(parser->user.host->configurable_plugins, di); + } + + rrdpush_send_dyncfg_enable(parser->user.host, cfg->name); + return PARSER_RC_OK; } +#define LOG_MSG_SIZE (1024) +#define MODULE_NAME_IDX (SERVING_PLUGINSD(parser) ? 1 : 2) +#define MODULE_TYPE_IDX (SERVING_PLUGINSD(parser) ? 2 : 3) static inline PARSER_RC pluginsd_register_module(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) { netdata_log_info("PLUGINSD: DYNCFG_REG_MODULE"); - struct configurable_plugin *plug_cfg = parser->user.cd->configuration; - if (unlikely(plug_cfg == NULL)) - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "you have to enable dynamic configuration first using " PLUGINSD_KEYWORD_DYNCFG_ENABLE); + size_t expected_num_words = SERVING_PLUGINSD(parser) ? 3 : 4; - if (unlikely(num_words != 3)) - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "expected 2 parameters module_name followed by module_type"); + if (unlikely(num_words != expected_num_words)) { + char log[LOG_MSG_SIZE + 1]; + snprintfz(log, LOG_MSG_SIZE, "expected %zu (got %zu) parameters: %smodule_name module_type", expected_num_words - 1, num_words - 1, SERVING_PLUGINSD(parser) ? "" : "plugin_name "); + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, log); + } + + struct configurable_plugin *plug_cfg; + const DICTIONARY_ITEM *di = NULL; + if (SERVING_PLUGINSD(parser)) { + plug_cfg = parser->user.cd->configuration; + if (unlikely(plug_cfg == NULL)) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "you have to enable dynamic configuration first using " PLUGINSD_KEYWORD_DYNCFG_ENABLE); + } else { + di = dictionary_get_and_acquire_item(parser->user.host->configurable_plugins, words[1]); + if (unlikely(di == NULL)) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "plugin not found"); + + plug_cfg = (struct configurable_plugin *)dictionary_acquired_item_value(di); + } struct module *mod = callocz(1, sizeof(struct module)); - mod->type = str2_module_type(words[2]); + mod->type = str2_module_type(words[MODULE_TYPE_IDX]); if (unlikely(mod->type == MOD_TYPE_UNKNOWN)) { freez(mod); return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "unknown module type (allowed: job_array, single)"); } - |