summaryrefslogtreecommitdiffstats
path: root/collectors/plugins.d
diff options
context:
space:
mode:
authorTimotej S <6674623+underhood@users.noreply.github.com>2023-09-29 17:13:42 +0200
committerGitHub <noreply@github.com>2023-09-29 17:13:42 +0200
commit6dfc99a2e0cf893c9ac2fd56d7986982738a9579 (patch)
treed5fb21199d99cc681641d7f6f8f33a7f083f435d /collectors/plugins.d
parentdd17442f81ca4bbb97c5b93c7a5525ce4946fbb1 (diff)
Dyncfg add streaming support (#15791)
* dyncfg fncnames as constants * add helper macros to know parser streaming/plugin * plugins dictionary per RRDHOST * api_request_v2_config add support for /host/ * streamify pluginsd_register_plugin * streamify pluginsd_register_module * streamify report_job_status * streamify dyncfg get functions * module_type2str * add job type and flags * add DYNCFG_REGISTER_JOB * implement register job * push all to parent at startup * add helper function is_dyncfg_function * forward virtual functions trough streaming * separate job2json * add api/v2/job_statuses * do cleanup on streaming * streamify set functions * support FUNCTION_PAYLOAD trough streaming * WIP tests * dont attempt loading non-localhost configs * move cfg persistence to proper place * prevent race * properly update job state at runtime * cleanup 1 * job2json add missing reason * add tests * correct HTTP code * add test * streamify delete_job_cb * add DELETE_JOB keyword * job delete over streaming * add tests for create and delete job over parent * rrdpush common checks to macro * add missing forwarders * fix jobs according to test results * more tests * review comment 1 * codacy remove valid warning * codacy ruby fixes * fix wrong rc check * minimal test plugin for child * add test * dict walk insted of master lock * minor - english spelling fixes * thiago comments 1 * minor - rename folder to dynconf * enable only when built with -DNETDATA_TEST_DYNCFG * minor - compiler warning * create dir post daemonization * stricter URL check
Diffstat (limited to 'collectors/plugins.d')
-rw-r--r--collectors/plugins.d/gperf-config.txt24
-rw-r--r--collectors/plugins.d/gperf-hashtable.h146
-rw-r--r--collectors/plugins.d/plugins_d.h10
-rw-r--r--collectors/plugins.d/pluginsd_parser.c494
4 files changed, 523 insertions, 151 deletions
diff --git a/collectors/plugins.d/gperf-config.txt b/collectors/plugins.d/gperf-config.txt
index b8140e66c5..a1d0c51ba8 100644
--- a/collectors/plugins.d/gperf-config.txt
+++ b/collectors/plugins.d/gperf-config.txt
@@ -36,20 +36,22 @@ SET, 11, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_
VARIABLE, 53, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 19
DYNCFG_ENABLE, 101, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 20
DYNCFG_REGISTER_MODULE, 102, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 21
-REPORT_JOB_STATUS, 110, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 22
+DYNCFG_REGISTER_JOB, 103, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 22
+REPORT_JOB_STATUS, 110, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 23
+DELETE_JOB, 111, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 24
#
# Streaming only keywords
#
-CLAIMED_ID, 61, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 23
-BEGIN2, 2, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 24
-SET2, 1, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 25
-END2, 3, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 26
+CLAIMED_ID, 61, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 25
+BEGIN2, 2, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 26
+SET2, 1, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 27
+END2, 3, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 28
#
# Streaming Replication keywords
#
-CHART_DEFINITION_END, 33, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 27
-RBEGIN, 22, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 28
-RDSTATE, 23, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 29
-REND, 25, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 30
-RSET, 21, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 31
-RSSTATE, 24, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 32
+CHART_DEFINITION_END, 33, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 29
+RBEGIN, 22, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 30
+RDSTATE, 23, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 31
+REND, 25, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 32
+RSET, 21, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 33
+RSSTATE, 24, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 34
diff --git a/collectors/plugins.d/gperf-hashtable.h b/collectors/plugins.d/gperf-hashtable.h
index e7d20126f6..5bbf9fa984 100644
--- a/collectors/plugins.d/gperf-hashtable.h
+++ b/collectors/plugins.d/gperf-hashtable.h
@@ -30,12 +30,12 @@
#endif
-#define GPERF_PARSER_TOTAL_KEYWORDS 32
+#define GPERF_PARSER_TOTAL_KEYWORDS 34
#define GPERF_PARSER_MIN_WORD_LENGTH 3
#define GPERF_PARSER_MAX_WORD_LENGTH 22
#define GPERF_PARSER_MIN_HASH_VALUE 3
-#define GPERF_PARSER_MAX_HASH_VALUE 41
-/* maximum key range = 39, duplicates = 0 */
+#define GPERF_PARSER_MAX_HASH_VALUE 36
+/* maximum key range = 34, duplicates = 0 */
#ifdef __GNUC__
__inline
@@ -49,32 +49,32 @@ gperf_keyword_hash_function (register const char *str, register size_t len)
{
static unsigned char asso_values[] =
{
- 42, 42, 42, 42, 42, 42, 42, 42, 42, 42,
- 42, 42, 42, 42, 42, 42, 42, 42, 42, 42,
- 42, 42, 42, 42, 42, 42, 42, 42, 42, 42,
- 42, 42, 42, 42, 42, 42, 42, 42, 42, 42,
- 42, 42, 42, 42, 42, 42, 42, 42, 42, 42,
- 42, 42, 42, 42, 42, 42, 42, 42, 42, 42,
- 42, 42, 42, 42, 42, 16, 7, 2, 11, 0,
- 8, 42, 3, 9, 42, 42, 9, 42, 0, 2,
- 42, 42, 1, 3, 42, 7, 17, 42, 27, 2,
- 42, 42, 42, 42, 42, 42, 42, 42, 42, 42,
- 42, 42, 42, 42, 42, 42, 42, 42, 42, 42,
- 42, 42, 42, 42, 42, 42, 42, 42, 42, 42,
- 42, 42, 42, 42, 42, 42, 42, 42, 42, 42,
- 42, 42, 42, 42, 42, 42, 42, 42, 42, 42,
- 42, 42, 42, 42, 42, 42, 42, 42, 42, 42,
- 42, 42, 42, 42, 42, 42, 42, 42, 42, 42,
- 42, 42, 42, 42, 42, 42, 42, 42, 42, 42,
- 42, 42, 42, 42, 42, 42, 42, 42, 42, 42,
- 42, 42, 42, 42, 42, 42, 42, 42, 42, 42,
- 42, 42, 42, 42, 42, 42, 42, 42, 42, 42,
- 42, 42, 42, 42, 42, 42, 42, 42, 42, 42,
- 42, 42, 42, 42, 42, 42, 42, 42, 42, 42,
- 42, 42, 42, 42, 42, 42, 42, 42, 42, 42,
- 42, 42, 42, 42, 42, 42, 42, 42, 42, 42,
- 42, 42, 42, 42, 42, 42, 42, 42, 42, 42,
- 42, 42, 42, 42, 42, 42
+ 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
+ 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
+ 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
+ 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
+ 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
+ 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
+ 37, 37, 37, 37, 37, 12, 28, 5, 2, 0,
+ 0, 37, 3, 13, 37, 37, 14, 37, 0, 2,
+ 37, 37, 1, 3, 37, 6, 10, 37, 32, 2,
+ 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
+ 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
+ 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
+ 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
+ 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
+ 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
+ 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
+ 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
+ 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
+ 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
+ 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
+ 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
+ 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
+ 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
+ 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
+ 37, 37, 37, 37, 37, 37, 37, 37, 37, 37,
+ 37, 37, 37, 37, 37, 37
};
return len + asso_values[(unsigned char)str[1]] + asso_values[(unsigned char)str[0]];
}
@@ -84,70 +84,72 @@ static PARSER_KEYWORD gperf_keywords[] =
{(char*)0}, {(char*)0}, {(char*)0},
#line 30 "gperf-config.txt"
{"END", 13, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 13},
-#line 46 "gperf-config.txt"
- {"END2", 3, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 26},
-#line 53 "gperf-config.txt"
- {"REND", 25, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 30},
+#line 48 "gperf-config.txt"
+ {"END2", 3, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 28},
+#line 55 "gperf-config.txt"
+ {"REND", 25, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 32},
#line 35 "gperf-config.txt"
{"SET", 11, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 18},
-#line 45 "gperf-config.txt"
- {"SET2", 1, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 25},
-#line 54 "gperf-config.txt"
- {"RSET", 21, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 31},
+#line 47 "gperf-config.txt"
+ {"SET2", 1, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 27},
+#line 56 "gperf-config.txt"
+ {"RSET", 21, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 33},
#line 18 "gperf-config.txt"
{"HOST", 71, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 4},
+#line 54 "gperf-config.txt"
+ {"RDSTATE", 23, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 31},
+#line 57 "gperf-config.txt"
+ {"RSSTATE", 24, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 34},
+#line 41 "gperf-config.txt"
+ {"DELETE_JOB", 111, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 24},
#line 26 "gperf-config.txt"
{"CHART", 32, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 9},
-#line 55 "gperf-config.txt"
- {"RSSTATE", 24, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 32},
-#line 25 "gperf-config.txt"
- {"BEGIN", 12, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 8},
-#line 44 "gperf-config.txt"
- {"BEGIN2", 2, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 24},
-#line 51 "gperf-config.txt"
- {"RBEGIN", 22, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 28},
+#line 31 "gperf-config.txt"
+ {"FUNCTION", 41, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 14},
#line 21 "gperf-config.txt"
{"HOST_LABEL", 74, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 7},
#line 19 "gperf-config.txt"
{"HOST_DEFINE", 72, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 5},
-#line 27 "gperf-config.txt"
- {"CLABEL", 34, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 10},
-#line 39 "gperf-config.txt"
- {"REPORT_JOB_STATUS", 110, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 22},
-#line 52 "gperf-config.txt"
- {"RDSTATE", 23, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 29},
-#line 20 "gperf-config.txt"
- {"HOST_DEFINE_END", 73, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 6},
-#line 43 "gperf-config.txt"
- {"CLAIMED_ID", 61, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 23},
-#line 15 "gperf-config.txt"
- {"FLUSH", 97, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 1},
-#line 31 "gperf-config.txt"
- {"FUNCTION", 41, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 14},
-#line 28 "gperf-config.txt"
- {"CLABEL_COMMIT", 35, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 11},
-#line 50 "gperf-config.txt"
- {"CHART_DEFINITION_END", 33, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 27},
#line 37 "gperf-config.txt"
{"DYNCFG_ENABLE", 101, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 20},
-#line 16 "gperf-config.txt"
- {"DISABLE", 98, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 2},
+#line 40 "gperf-config.txt"
+ {"REPORT_JOB_STATUS", 110, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 23},
+#line 15 "gperf-config.txt"
+ {"FLUSH", 97, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 1},
+#line 20 "gperf-config.txt"
+ {"HOST_DEFINE_END", 73, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 6},
#line 34 "gperf-config.txt"
{"OVERWRITE", 52, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 17},
+#line 16 "gperf-config.txt"
+ {"DISABLE", 98, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 2},
+#line 39 "gperf-config.txt"
+ {"DYNCFG_REGISTER_JOB", 103, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 22},
#line 29 "gperf-config.txt"
{"DIMENSION", 31, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 12},
-#line 33 "gperf-config.txt"
- {"LABEL", 51, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 16},
-#line 17 "gperf-config.txt"
- {"EXIT", 99, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 3},
- {(char*)0}, {(char*)0}, {(char*)0},
+#line 27 "gperf-config.txt"
+ {"CLABEL", 34, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 10},
#line 38 "gperf-config.txt"
{"DYNCFG_REGISTER_MODULE", 102, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 21},
#line 32 "gperf-config.txt"
{"FUNCTION_RESULT_BEGIN", 42, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 15},
- {(char*)0}, {(char*)0}, {(char*)0}, {(char*)0},
+#line 52 "gperf-config.txt"
+ {"CHART_DEFINITION_END", 33, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 29},
+#line 45 "gperf-config.txt"
+ {"CLAIMED_ID", 61, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 25},
#line 36 "gperf-config.txt"
- {"VARIABLE", 53, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 19}
+ {"VARIABLE", 53, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 19},
+#line 33 "gperf-config.txt"
+ {"LABEL", 51, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 16},
+#line 28 "gperf-config.txt"
+ {"CLABEL_COMMIT", 35, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 11},
+#line 25 "gperf-config.txt"
+ {"BEGIN", 12, PARSER_INIT_PLUGINSD|PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 8},
+#line 46 "gperf-config.txt"
+ {"BEGIN2", 2, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 26},
+#line 53 "gperf-config.txt"
+ {"RBEGIN", 22, PARSER_INIT_STREAMING, WORKER_PARSER_FIRST_JOB + 30},
+#line 17 "gperf-config.txt"
+ {"EXIT", 99, PARSER_INIT_PLUGINSD, WORKER_PARSER_FIRST_JOB + 3}
};
PARSER_KEYWORD *
diff --git a/collectors/plugins.d/plugins_d.h b/collectors/plugins.d/plugins_d.h
index b22b037ba6..7c5df4168e 100644
--- a/collectors/plugins.d/plugins_d.h
+++ b/collectors/plugins.d/plugins_d.h
@@ -10,6 +10,16 @@
#define PLUGINSD_CMD_MAX (FILENAME_MAX*2)
#define PLUGINSD_STOCK_PLUGINS_DIRECTORY_PATH 0
+#define PLUGINSD_KEYWORD_FUNCTION_PAYLOAD "FUNCTION_PAYLOAD"
+#define PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END "FUNCTION_PAYLOAD_END"
+
+#define PLUGINSD_KEYWORD_DYNCFG_ENABLE "DYNCFG_ENABLE"
+#define PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE "DYNCFG_REGISTER_MODULE"
+#define PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB "DYNCFG_REGISTER_JOB"
+
+#define PLUGINSD_KEYWORD_REPORT_JOB_STATUS "REPORT_JOB_STATUS"
+#define PLUGINSD_KEYWORD_DELETE_JOB "DELETE_JOB"
+
#define PLUGINSD_LINE_MAX_SSL_READ 512
#define PLUGINSD_MAX_WORDS 20
diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c
index fbcd812627..d568db5ca2 100644
--- a/collectors/plugins.d/pluginsd_parser.c
+++ b/collectors/plugins.d/pluginsd_parser.c
@@ -4,6 +4,9 @@
#define LOG_FUNCTIONS false
+#define SERVING_STREAMING(parser) (parser->repertoire == PARSER_INIT_STREAMING)
+#define SERVING_PLUGINSD(parser) (parser->repertoire == PARSER_INIT_PLUGINSD)
+
static ssize_t send_to_plugin(const char *txt, void *data) {
PARSER *parser = data;
@@ -742,6 +745,7 @@ struct inflight_function {
usec_t sent_ut;
const char *payload;
PARSER *parser;
+ bool virtual;
};
static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void *func, void *parser_ptr) {
@@ -808,16 +812,83 @@ static bool inflight_functions_conflict_callback(const DICTIONARY_ITEM *item __m
return false;
}
-static void inflight_functions_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func, void *parser_ptr __maybe_unused) {
+void delete_job_finalize(struct parser *parser, struct configurable_plugin *plug, const char *fnc_sig, int code) {
+ if (code != DYNCFG_VFNC_RET_CFG_ACCEPTED)
+ return;
+
+ char *params_local = strdupz(fnc_sig);
+ char *words[DYNCFG_MAX_WORDS];
+ size_t words_c = quoted_strings_splitter(params_local, words, DYNCFG_MAX_WORDS, isspace_map_pluginsd);
+
+ if (words_c != 3) {
+ netdata_log_error("PLUGINSD_PARSER: invalid number of parameters for delete_job");
+ freez(params_local);
+ return;
+ }
+
+ const char *module = words[1];
+ const char *job = words[2];
+
+ delete_job(plug, module, job);
+
+ unlink_job(plug->name, module, job);
+
+ rrdpush_send_job_deleted(localhost, plug->name, module, job);
+
+ freez(params_local);
+}
+
+void set_job_finalize(struct parser *parser, struct configurable_plugin *plug, const char *fnc_sig, int code) {
+ if (code != DYNCFG_VFNC_RET_CFG_ACCEPTED)
+ return;
+
+ char *params_local = strdupz(fnc_sig);
+ char *words[DYNCFG_MAX_WORDS];
+ size_t words_c = quoted_strings_splitter(params_local, words, DYNCFG_MAX_WORDS, isspace_map_pluginsd);
+
+ if (words_c != 3) {
+ netdata_log_error("PLUGINSD_PARSER: invalid number of parameters for set_job_config");
+ freez(params_local);
+ return;
+ }
+
+ const char *module_name = get_word(words, words_c, 1);
+ const char *job_name = get_word(words, words_c, 2);
+
+ if (register_job(parser->user.host->configurable_plugins, parser->user.cd->configuration->name, module_name, job_name, JOB_TYPE_USER, JOB_FLG_USER_CREATED, 1)) {
+ freez(params_local);
+ return;
+ }
+
+ // only send this if it is not existing already (register_job cares for that)
+ rrdpush_send_dyncfg_reg_job(localhost, parser->user.cd->configuration->name, module_name, job_name, JOB_TYPE_USER, JOB_FLG_USER_CREATED);
+
+ freez(params_local);
+}
+
+static void inflight_functions_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func, void *parser_ptr) {
struct inflight_function *pf = func;
+ struct parser *parser = (struct parser *)parser_ptr;
internal_error(LOG_FUNCTIONS,
"FUNCTION '%s' result of transaction '%s' received from collector (%zu bytes, request %"PRIu64" usec, response %"PRIu64" usec)",
string2str(pf->function), dictionary_acquired_item_name(item),
buffer_strlen(pf->result_body_wb), pf->sent_ut - pf->started_ut, now_realtime_usec() - pf->sent_ut);
+ if (pf->virtual && SERVING_PLUGINSD(parser)) {
+ if (pf->payload) {
+ if (strncmp(string2str(pf->function), FUNCTION_NAME_SET_JOB_CONFIG, strlen(FUNCTION_NAME_SET_JOB_CONFIG)) == 0)
+ set_job_finalize(parser, parser->user.cd->configuration, string2str(pf->function), pf->code);
+ dyn_conf_store_config(string2str(pf->function), pf->payload, parser->user.cd->configuration);
+ } else if (strncmp(string2str(pf->function), FUNCTION_NAME_DELETE_JOB, strlen(FUNCTION_NAME_DELETE_JOB)) == 0) {
+ delete_job_finalize(parser, parser->user.cd->configuration, string2str(pf->function), pf->code);
+ }
+ }
+
pf->result_cb(pf->result_body_wb, pf->code, pf->result_cb_data);
+
string_freez(pf->function);
+ freez(pf->payload);
}
void inflight_functions_init(PARSER *parser) {
@@ -1947,9 +2018,81 @@ static void virt_fnc_got_data_cb(BUFFER *wb __maybe_unused, int code, void *call
}
#define VIRT_FNC_TIMEOUT 1
+#define VIRT_FNC_BUF_SIZE (4096)
+void call_virtual_function_async(BUFFER *wb, RRDHOST *host, const char *name, const char *payload, rrd_function_result_callback_t callback, void *callback_data) {
+ PARSER *parser = NULL;
+
+ //TODO simplify (as we really need only first parameter to get plugin name maybe we can avoid parsing all)
+ char *words[PLUGINSD_MAX_WORDS];
+ char *function_with_params = strdupz(name);
+ size_t num_words = quoted_strings_splitter(function_with_params, words, PLUGINSD_MAX_WORDS, isspace_map_pluginsd);
+
+ if (num_words < 2) {
+ netdata_log_error("PLUGINSD: virtual function name is empty.");
+ freez(function_with_params);
+ return;
+ }
+
+ const DICTIONARY_ITEM *cpi = dictionary_get_and_acquire_item(host->configurable_plugins, get_word(words, num_words, 1));
+ if (unlikely(cpi == NULL)) {
+ netdata_log_error("PLUGINSD: virtual function plugin '%s' not found.", name);
+ freez(function_with_params);
+ return;
+ }
+ struct configurable_plugin *cp = dictionary_acquired_item_value(cpi);
+ parser = (PARSER *)cp->cb_usr_ctx;
+
+ BUFFER *function_out = buffer_create(VIRT_FNC_BUF_SIZE, NULL);
+ // if we are forwarding this to a plugin (as opposed to streaming/child) we have to remove the first parameter (plugin_name)
+ buffer_strcat(function_out, get_word(words, num_words, 0));
+ for (size_t i = 1; i < num_words; i++) {
+ if (i == 1 && SERVING_PLUGINSD(parser))
+ continue;
+ buffer_sprintf(function_out, " %s", get_word(words, num_words, i));
+ }
+ freez(function_with_params);
+
+ usec_t now = now_realtime_usec();
+
+ struct inflight_function tmp = {
+ .started_ut = now,
+ .timeout_ut = now + VIRT_FNC_TIMEOUT + USEC_PER_SEC,
+ .result_body_wb = wb,
+ .timeout = VIRT_FNC_TIMEOUT * 10,
+ .function = string_strdupz(buffer_tostring(function_out)),
+ .result_cb = callback,
+ .result_cb_data = callback_data,
+ .payload = payload != NULL ? strdupz(payload) : NULL,
+ .virtual = true,
+ };
+ buffer_free(function_out);
+
+ uuid_t uuid;
+ uuid_generate_time(uuid);
+
+ char key[UUID_STR_LEN];
+ uuid_unparse_lower(uuid, key);
+
+ dictionary_write_lock(parser->inflight.functions);
+
+ // if there is any error, our dictionary callbacks will call the caller callback to notify
+ // the caller about the error - no need for error handling here.
+ dictionary_set(parser->inflight.functions, key, &tmp, sizeof(struct inflight_function));
+
+ if(!parser->inflight.smaller_timeout || tmp.timeout_ut < parser->inflight.smaller_timeout)
+ parser->inflight.smaller_timeout = tmp.timeout_ut;
+
+ // garbage collect stale inflight functions
+ if(parser->inflight.smaller_timeout < now)
+ inflight_functions_garbage_collect(parser, now);
+
+ dictionary_write_unlock(parser->inflight.functions);
+}
+
+
dyncfg_config_t call_virtual_function_blocking(PARSER *parser, const char *name, int *rc, const char *payload) {
usec_t now = now_realtime_usec();
- BUFFER *wb = buffer_create(4096, NULL);
+ BUFFER *wb = buffer_create(VIRT_FNC_BUF_SIZE, NULL);
struct mutex_cond cond = {
.lock = PTHREAD_MUTEX_INITIALIZER,
@@ -1964,7 +2107,8 @@ dyncfg_config_t call_virtual_function_blocking(PARSER *parser, const char *name,
.function = string_strdupz(name),
.result_cb = virt_fnc_got_data_cb,
.result_cb_data = &cond,
- .payload = payload,
+ .payload = payload != NULL ? strdupz(payload) : NULL,
+ .virtual = true,
};
uuid_t uuid;
@@ -2011,98 +2155,188 @@ dyncfg_config_t call_virtual_function_blocking(PARSER *parser, const char *name,
return cfg;
}
-static dyncfg_config_t get_plugin_config_cb(void *usr_ctx)
+#define CVF_MAX_LEN (1024)
+static dyncfg_config_t get_plugin_config_cb(void *usr_ctx, const char *plugin_name)
{
PARSER *parser = usr_ctx;
- return call_virtual_function_blocking(parser, "get_plugin_config", NULL, NULL);
+
+ if (SERVING_STREAMING(parser)) {
+ char buf[CVF_MAX_LEN + 1];
+ snprintfz(buf, CVF_MAX_LEN, FUNCTION_NAME_GET_PLUGIN_CONFIG " %s", plugin_name);
+ return call_virtual_function_blocking(parser, buf, NULL, NULL);
+ }
+
+ return call_virtual_function_blocking(parser, FUNCTION_NAME_GET_PLUGIN_CONFIG, NULL, NULL);
}
-static dyncfg_config_t get_plugin_config_schema_cb(void *usr_ctx)
+static dyncfg_config_t get_plugin_config_schema_cb(void *usr_ctx, const char *plugin_name)
{
PARSER *parser = usr_ctx;
+
+ if (SERVING_STREAMING(parser)) {
+ char buf[CVF_MAX_LEN + 1];
+ snprintfz(buf, CVF_MAX_LEN, FUNCTION_NAME_GET_PLUGIN_CONFIG_SCHEMA " %s", plugin_name);
+ return call_virtual_function_blocking(parser, buf, NULL, NULL);
+ }
+
return call_virtual_function_blocking(parser, "get_plugin_config_schema", NULL, NULL);
}
-static dyncfg_config_t get_module_config_cb(void *usr_ctx, const char *module_name)
+static dyncfg_config_t get_module_config_cb(void *usr_ctx, const char *plugin_name, const char *module_name)
{
PARSER *parser = usr_ctx;
- char buf[1024];
- snprintfz(buf, sizeof(buf), "get_module_config %s", module_name);
- return call_virtual_function_blocking(parser, buf, NULL, NULL);
+ BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL);
+
+ buffer_strcat(wb, FUNCTION_NAME_GET_MODULE_CONFIG);
+ if (SERVING_STREAMING(parser))
+ buffer_sprintf(wb, " %s", plugin_name);
+
+ buffer_sprintf(wb, " %s", module_name);
+
+ dyncfg_config_t ret = call_virtual_function_blocking(parser, buffer_tostring(wb), NULL, NULL);
+
+ buffer_free(wb);
+
+ return ret;
}
-static dyncfg_config_t get_module_config_schema_cb(void *usr_ctx, const char *module_name)
+static dyncfg_config_t get_module_config_schema_cb(void *usr_ctx, const char *plugin_name, const char *module_name)
{
PARSER *parser = usr_ctx;
- char buf[1024];
- snprintfz(buf, sizeof(buf), "get_module_config_schema %s", module_name);
- return call_virtual_function_blocking(parser, buf, NULL, NULL);
+ BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL);
+
+ buffer_strcat(wb, FUNCTION_NAME_GET_MODULE_CONFIG_SCHEMA);
+ if (SERVING_STREAMING(parser))
+ buffer_sprintf(wb, " %s", plugin_name);
+
+ buffer_sprintf(wb, " %s", module_name);
+
+ dyncfg_config_t ret = call_virtual_function_blocking(parser, buffer_tostring(wb), NULL, NULL);
+
+ buffer_free(wb);
+
+ return ret;
}
-static dyncfg_config_t get_job_config_schema_cb(void *usr_ctx, const char *module_name)
+static dyncfg_config_t get_job_config_schema_cb(void *usr_ctx, const char *plugin_name, const char *module_name)
{
PARSER *parser = usr_ctx;
- char buf[1024];
- snprintfz(buf, sizeof(buf), "get_job_config_schema %s", module_name);
- return call_virtual_function_blocking(parser, buf, NULL, NULL);
+ BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL);
+
+ buffer_strcat(wb, FUNCTION_NAME_GET_JOB_CONFIG_SCHEMA);
+
+ if (SERVING_STREAMING(parser))
+ buffer_sprintf(wb, " %s", plugin_name);
+
+ buffer_sprintf(wb, " %s", module_name);
+
+ dyncfg_config_t ret = call_virtual_function_blocking(parser, buffer_tostring(wb), NULL, NULL);
+
+ buffer_free(wb);
+
+ return ret;
}
-static dyncfg_config_t get_job_config_cb(void *usr_ctx, const char *module_name, const char* job_name)
+static dyncfg_config_t get_job_config_cb(void *usr_ctx, const char *plugin_name, const char *module_name, const char* job_name)
{
PARSER *parser = usr_ctx;
- char buf[1024];
- snprintfz(buf, sizeof(buf), "get_job_config %s %s", module_name, job_name);
- return call_virtual_function_blocking(parser, buf, NULL, NULL);
+ BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL);
+
+ buffer_strcat(wb, FUNCTION_NAME_GET_JOB_CONFIG);
+
+ if (SERVING_STREAMING(parser))
+ buffer_sprintf(wb, " %s", plugin_name);
+
+ buffer_sprintf(wb, " %s %s", module_name, job_name);
+
+ dyncfg_config_t ret = call_virtual_function_blocking(parser, buffer_tostring(wb), NULL, NULL);
+
+ buffer_free(wb);
+
+ return ret;
}
-enum set_config_result set_plugin_config_cb(void *usr_ctx, dyncfg_config_t *cfg)
+enum set_config_result set_plugin_config_cb(void *usr_ctx, const char *plugin_name, dyncfg_config_t *cfg)
{
PARSER *parser = usr_ctx;
+ BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL);
+
+ buffer_strcat(wb, FUNCTION_NAME_SET_PLUGIN_CONFIG);
+
+ if (SERVING_STREAMING(parser))
+ buffer_sprintf(wb, " %s", plugin_name);
+
int rc;
- call_virtual_function_blocking(parser, "set_plugin_config", &rc, cfg->data);
- if(rc != 1)
+ call_virtual_function_blocking(parser, buffer_tostring(wb), &rc, cfg->data);
+
+ buffer_free(wb);
+ if(rc != DYNCFG_VFNC_RET_CFG_ACCEPTED)
return SET_CONFIG_REJECTED;
return SET_CONFIG_ACCEPTED;
}
-enum set_config_result set_module_config_cb(void *usr_ctx, const char *module_name, dyncfg_config_t *cfg)
+enum set_config_result set_module_config_cb(void *usr_ctx, const char *plugin_name, const char *module_name, dyncfg_config_t *cfg)
{
PARSER *parser = usr_ctx;
+ BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL);
+
+ buffer_strcat(wb, FUNCTION_NAME_SET_MODULE_CONFIG);
+
+ if (SERVING_STREAMING(parser))
+ buffer_sprintf(wb, " %s", plugin_name);
+
+ buffer_sprintf(wb, " %s", module_name);
+
int rc;
+ call_virtual_function_blocking(parser, buffer_tostring(wb), &rc, cfg->data);
- char buf[1024];
- snprintfz(buf, sizeof(buf), "set_module_config %s", module_name);
- call_virtual_function_blocking(parser, buf, &rc, cfg->data);
+ buffer_free(wb);
- if(rc != 1)
+ if(rc != DYNCFG_VFNC_RET_CFG_ACCEPTED)
return SET_CONFIG_REJECTED;
return SET_CONFIG_ACCEPTED;
}
-enum set_config_result set_job_config_cb(void *usr_ctx, const char *module_name, const char *job_name, dyncfg_config_t *cfg)
+enum set_config_result set_job_config_cb(void *usr_ctx, const char *plugin_name, const char *module_name, const char *job_name, dyncfg_config_t *cfg)
{
PARSER *parser = usr_ctx;
+ BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL);
+
+ buffer_strcat(wb, FUNCTION_NAME_SET_JOB_CONFIG);
+
+ if (SERVING_STREAMING(parser))
+ buffer_sprintf(wb, " %s", plugin_name);
+
+ buffer_sprintf(wb, " %s %s", module_name, job_name);
+
int rc;
+ call_virtual_function_blocking(parser, buffer_tostring(wb), &rc, cfg->data);
- char buf[1024];
- snprintfz(buf, sizeof(buf), "set_job_config %s %s", module_name, job_name);
- call_virtual_function_blocking(parser, buf, &rc, cfg->data);
+ buffer_free(wb);
- if(rc != 1)
+ if(rc != DYNCFG_VFNC_RET_CFG_ACCEPTED)
return SET_CONFIG_REJECTED;
return SET_CONFIG_ACCEPTED;
}
-enum set_config_result delete_job_cb(void *usr_ctx, const char *module_name, const char *job_name)
+enum set_config_result delete_job_cb(void *usr_ctx, const char *plugin_name ,const char *module_name, const char *job_name)
{
PARSER *parser = usr_ctx;
+ BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL);
+
+ buffer_strcat(wb, FUNCTION_NAME_DELETE_JOB);
+
+ if (SERVING_STREAMING(parser))
+ buffer_sprintf(wb, " %s", plugin_name);
+
+ buffer_sprintf(wb, " %s %s", module_name, job_name);
+
int rc;
+ call_virtual_function_blocking(parser, buffer_tostring(wb), &rc, NULL);
- char buf[1024];
- snprintfz(buf, sizeof(buf), "delete_job %s %s", module_name, job_name);
- call_virtual_function_blocking(parser, buf, &rc, NULL);
+ buffer_free(wb);
- if(rc != 1)
+ if(rc != DYNCFG_VFNC_RET_CFG_ACCEPTED)
return SET_CONFIG_REJECTED;
return SET_CONFIG_ACCEPTED;
}
@@ -2122,37 +2356,65 @@ static inline PARSER_RC pluginsd_register_plugin(char **words __maybe_unused, si
cfg->get_config_schema_cb = get_plugin_config_schema_cb;
cfg->cb_usr_ctx = parser;
- parser->user.cd->cfg_dict_item = register_plugin(cfg);
-
- if (unlikely(parser->user.cd->cfg_dict_item == NULL)) {
+ const DICTIONARY_ITEM *di = register_plugin(parser->user.host->configurable_plugins, cfg, SERVING_PLUGINSD(parser));
+ if (unlikely(di == NULL)) {
freez(cfg->name);
freez(cfg);
return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_ENABLE, "error registering plugin");
}
- parser->user.cd->configuration = cfg;
+ if (SERVING_PLUGINSD(parser)) {
+ // this is optimization for pluginsd to avoid extra dictionary lookup
+ // as we know which plugin is comunicating with us
+ parser->user.cd->cfg_dict_item = di;
+ parser->user.cd->configuration = cfg;
+ } else {
+ // register_plugin keeps the item acquired, so we need to release it
+ dictionary_acquired_item_release(parser->user.host->configurable_plugins, di);
+ }
+
+ rrdpush_send_dyncfg_enable(parser->user.host, cfg->name);
+
return PARSER_RC_OK;
}
+#define LOG_MSG_SIZE (1024)
+#define MODULE_NAME_IDX (SERVING_PLUGINSD(parser) ? 1 : 2)
+#define MODULE_TYPE_IDX (SERVING_PLUGINSD(parser) ? 2 : 3)
static inline PARSER_RC pluginsd_register_module(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) {
netdata_log_info("PLUGINSD: DYNCFG_REG_MODULE");
- struct configurable_plugin *plug_cfg = parser->user.cd->configuration;
- if (unlikely(plug_cfg == NULL))
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "you have to enable dynamic configuration first using " PLUGINSD_KEYWORD_DYNCFG_ENABLE);
+ size_t expected_num_words = SERVING_PLUGINSD(parser) ? 3 : 4;
- if (unlikely(num_words != 3))
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "expected 2 parameters module_name followed by module_type");
+ if (unlikely(num_words != expected_num_words)) {
+ char log[LOG_MSG_SIZE + 1];
+ snprintfz(log, LOG_MSG_SIZE, "expected %zu (got %zu) parameters: %smodule_name module_type", expected_num_words - 1, num_words - 1, SERVING_PLUGINSD(parser) ? "" : "plugin_name ");
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, log);
+ }
+
+ struct configurable_plugin *plug_cfg;
+ const DICTIONARY_ITEM *di = NULL;
+ if (SERVING_PLUGINSD(parser)) {
+ plug_cfg = parser->user.cd->configuration;
+ if (unlikely(plug_cfg == NULL))
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "you have to enable dynamic configuration first using " PLUGINSD_KEYWORD_DYNCFG_ENABLE);
+ } else {
+ di = dictionary_get_and_acquire_item(parser->user.host->configurable_plugins, words[1]);
+ if (unlikely(di == NULL))
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "plugin not found");
+
+ plug_cfg = (struct configurable_plugin *)dictionary_acquired_item_value(di);
+ }
struct module *mod = callocz(1, sizeof(struct module));
- mod->type = str2_module_type(words[2]);
+ mod->type = str2_module_type(words[MODULE_TYPE_IDX]);
if (unlikely(mod->type == MOD_TYPE_UNKNOWN)) {
freez(mod);
return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "unknown module type (allowed: job_array, single)");
}
-