diff options
author | vkalintiris <vasilis@netdata.cloud> | 2022-10-31 19:53:20 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-10-31 19:53:20 +0200 |
commit | 282e0dfaa97289cc6542742e9e389bd76b7e4164 (patch) | |
tree | b23e108b35adc8ed322e8167d0f1fe607c2cfa4c /collectors | |
parent | df87a538cfaba5014a752937714756b7c5d30c93 (diff) |
Replication of metrics (gaps filling) during streaming (#13873)
* Revert "Use llvm's ar and ranlib when compiling with clang (#13854)"
This reverts commit a9135f47bbb36e9cb437b18a7109607569580db7.
* Profile plugin
* Fix macos static thread
* Add support for replication
- Add a new capability for replication, when not supported the agent
should behave as previously.
- When replication is supported, the text protocol supports the
following new commands:
- CHART_DEFINITION_END: send the first/last entry of the child
- REPLAY_RRDSET_BEGIN: sends the name of the chart we are
replicating
- REPLAY_RRDSET_HEADER: sends a line describing the columns of the
following command (ie. start-time, end-time, dim1-name, ...)
- REPLAY_RRDSET_DONE: sends values to push for a specific start/end
time
- REPLAY_RRDSET_END: send the (a) update every of the chart, (b)
first/last entries in DB, (c) whether the child's been told to
start streaming, (d) original after/before period to replicate.
- REPLAY_CHART: Sent from a parent to a child, specifying (a)
the chart name we want data for, (b) whether the child should
start streaming once it has fullfilled the request with the
aforementioned commands, (c) after/before of the data the parent
wants
- As a consequence of the new protocol, streaming is disabled for all
charts on a new connection. It's enabled once replication is finished.
- The configuration parameters are specified from within stream.conf:
- "enable replication = yes|no"
- "seconds to replicate = 3600"
- "replication step = 600" (ie. how many seconds to fill per
roundtrip request.
* Minor fixes
- quote set and dim ids
- start streaming after writing replicated data to the buffer
- write replicated data only when buffer is less than 50% full.
- use reentrant iteration for charts
* Do not send chart definitions on connection.
* Track replication status through rrdset flags.
* Add debug flag for noisy log messages.
* Add license notice.
* Iterate charts with reentrant loop
* Set replication finished flag when streaming is disabled.
* Revert "Profile plugin"
This reverts commit 468fc9386e5283e0865fae56e9989b8ec83de14d.
Used only for testing purposes.
* Revert "Revert "Use llvm's ar and ranlib when compiling with clang (#13854)""
This reverts commit 27c955c58d95aed6c44d42e8b675f0cf3ca45c6d.
Reapply commit that I had to revert in order to be able to build the
agent on MacOS.
* Build replication source files with CMake.
* Pass number of words in plugind functions.
* Use get_word instead of indexing words.
* Use size_t instead of int.
* Pay only what we use when splitting words.
* no need to redefine PLUGINSD_MAX_WORDS
* fix formatting warning
* all usages of pluginsd_split_words() should use the return value to ensure non-cached results reuse; no need to lock the host to find a chart
* keep a sender dictionary with all the replication commands received and remove replication commands from charts
* do not replicate future data
* use last_updated to find the end of the db
* uniformity of replication logs
* rewrite of the query logic
* replication.c in C; debug info in human readable dates
* update the chart on every replication row
* update all chart members so that rrdset_done() can continue
* update the protocol to push one dimension per line and transfer data collection state to parent
* fix formatting
* remove replication object from pluginsd
* shorter communication
* fix typo
* support for replication proxies
* proper use of flags
* set receiver replication finished flag on charts created after the sender has been connected
* clear RRDSET_FLAG_SYNC_CLOCK on replicated charts
* log storing of nulls
* log first store
* log update every switches
* test ignoring timestamps but sending a point just after replication end
* replication should work on end_time
* use replicated timestamps
* at the final replication step, replicate all the remaining points
* cleanup code from tests
* print timestamps as unsigned long long
* more formating changes; fix conflicting type of replicate_chart_response()
* updated stream.conf
* always respond to replication requests
* in non-dbengine db modes, do not replicate more than the database size
* advance the db pointer of legacy db modes
* should be multiplied by update_every
* fix buggy label parsing - identified by codacy
* dont log error on history mismatches for db mode dbengine
* allow SSL requests to streaming children
* dont use ssl variable
Co-authored-by: Costa Tsaousis <costa@netdata.cloud>
Diffstat (limited to 'collectors')
-rw-r--r-- | collectors/apps.plugin/apps_plugin.c | 53 | ||||
-rw-r--r-- | collectors/plugins.d/plugins_d.c | 2 | ||||
-rw-r--r-- | collectors/plugins.d/plugins_d.h | 13 | ||||
-rw-r--r-- | collectors/plugins.d/pluginsd_parser.c | 563 | ||||
-rw-r--r-- | collectors/plugins.d/pluginsd_parser.h | 16 | ||||
-rw-r--r-- | collectors/statsd.plugin/statsd.c | 18 | ||||
-rw-r--r-- | collectors/tc.plugin/plugin_tc.c | 2 |
7 files changed, 516 insertions, 151 deletions
diff --git a/collectors/apps.plugin/apps_plugin.c b/collectors/apps.plugin/apps_plugin.c index 858bef2c50..084afeb3d6 100644 --- a/collectors/apps.plugin/apps_plugin.c +++ b/collectors/apps.plugin/apps_plugin.c @@ -4311,7 +4311,7 @@ static void apps_plugin_function_processes(const char *transaction, char *functi struct pid_stat *p; char *words[PLUGINSD_MAX_WORDS] = { NULL }; - pluginsd_split_words(function, words, PLUGINSD_MAX_WORDS, NULL, NULL, 0); + size_t num_words = pluginsd_split_words(function, words, PLUGINSD_MAX_WORDS, NULL, NULL, 0); struct target *category = NULL, *user = NULL, *group = NULL; const char *process_name = NULL; @@ -4322,51 +4322,52 @@ static void apps_plugin_function_processes(const char *transaction, char *functi bool filter_pid = false, filter_uid = false, filter_gid = false; for(int i = 1; i < PLUGINSD_MAX_WORDS ;i++) { - if(!words[i]) break; + const char *keyword = get_word(words, num_words, i); + if(!keyword) break; - if(!category && strncmp(words[i], PROCESS_FILTER_CATEGORY, strlen(PROCESS_FILTER_CATEGORY)) == 0) { - category = find_target_by_name(apps_groups_root_target, &words[i][strlen(PROCESS_FILTER_CATEGORY)]); + if(!category && strncmp(keyword, PROCESS_FILTER_CATEGORY, strlen(PROCESS_FILTER_CATEGORY)) == 0) { + category = find_target_by_name(apps_groups_root_target, &keyword[strlen(PROCESS_FILTER_CATEGORY)]); if(!category) { apps_plugin_function_error(transaction, HTTP_RESP_BAD_REQUEST, "No category with that name found."); return; } } - else if(!user && strncmp(words[i], PROCESS_FILTER_USER, strlen(PROCESS_FILTER_USER)) == 0) { - user = find_target_by_name(users_root_target, &words[i][strlen(PROCESS_FILTER_USER)]); + else if(!user && strncmp(keyword, PROCESS_FILTER_USER, strlen(PROCESS_FILTER_USER)) == 0) { + user = find_target_by_name(users_root_target, &keyword[strlen(PROCESS_FILTER_USER)]); if(!user) { apps_plugin_function_error(transaction, HTTP_RESP_BAD_REQUEST, "No user with that name found."); return; } } - else if(strncmp(words[i], PROCESS_FILTER_GROUP, strlen(PROCESS_FILTER_GROUP)) == 0) { - group = find_target_by_name(groups_root_target, &words[i][strlen(PROCESS_FILTER_GROUP)]); + else if(strncmp(keyword, PROCESS_FILTER_GROUP, strlen(PROCESS_FILTER_GROUP)) == 0) { + group = find_target_by_name(groups_root_target, &keyword[strlen(PROCESS_FILTER_GROUP)]); if(!group) { apps_plugin_function_error(transaction, HTTP_RESP_BAD_REQUEST, "No group with that name found."); return; } } - else if(!process_name && strncmp(words[i], PROCESS_FILTER_PROCESS, strlen(PROCESS_FILTER_PROCESS)) == 0) { - process_name = &words[i][strlen(PROCESS_FILTER_PROCESS)]; + else if(!process_name && strncmp(keyword, PROCESS_FILTER_PROCESS, strlen(PROCESS_FILTER_PROCESS)) == 0) { + process_name = &keyword[strlen(PROCESS_FILTER_PROCESS)]; } - else if(!pid && strncmp(words[i], PROCESS_FILTER_PID, strlen(PROCESS_FILTER_PID)) == 0) { - pid = str2i(&words[i][strlen(PROCESS_FILTER_PID)]); + else if(!pid && strncmp(keyword, PROCESS_FILTER_PID, strlen(PROCESS_FILTER_PID)) == 0) { + pid = str2i(&keyword[strlen(PROCESS_FILTER_PID)]); filter_pid = true; } - else if(!uid && strncmp(words[i], PROCESS_FILTER_UID, strlen(PROCESS_FILTER_UID)) == 0) { - uid = str2i(&words[i][strlen(PROCESS_FILTER_UID)]); + else if(!uid && strncmp(keyword, PROCESS_FILTER_UID, strlen(PROCESS_FILTER_UID)) == 0) { + uid = str2i(&keyword[strlen(PROCESS_FILTER_UID)]); filter_uid = true; } - else if(!gid && strncmp(words[i], PROCESS_FILTER_GID, strlen(PROCESS_FILTER_GID)) == 0) { - gid = str2i(&words[i][strlen(PROCESS_FILTER_GID)]); + else if(!gid && strncmp(keyword, PROCESS_FILTER_GID, strlen(PROCESS_FILTER_GID)) == 0) { + gid = str2i(&keyword[strlen(PROCESS_FILTER_GID)]); filter_gid = true; } - else if(strcmp(words[i], "help") == 0) { + else if(strcmp(keyword, "help") == 0) { apps_plugin_function_processes_help(transaction); return; } else { char msg[PLUGINSD_LINE_MAX]; - snprintfz(msg, PLUGINSD_LINE_MAX, "Invalid parameter '%s'", words[i]); + snprintfz(msg, PLUGINSD_LINE_MAX, "Invalid parameter '%s'", keyword); apps_plugin_function_error(transaction, HTTP_RESP_BAD_REQUEST, msg); return; } @@ -4779,16 +4780,18 @@ void *reader_main(void *arg __maybe_unused) { while(!apps_plugin_exit && (s = fgets(buffer, PLUGINSD_LINE_MAX, stdin))) { char *words[PLUGINSD_MAX_WORDS] = { NULL }; - pluginsd_split_words(buffer, words, PLUGINSD_MAX_WORDS, NULL, NULL, 0); + size_t num_words = pluginsd_split_words(buffer, words, PLUGINSD_MAX_WORDS, NULL, NULL, 0); - if(words[0] && strcmp(words[0], PLUGINSD_KEYWORD_FUNCTION) == 0) { - char *transaction = words[1]; - char *timeout_s = words[2]; - char *function = words[3]; + const char *keyword = get_word(words, num_words, 0); + + if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0) { + char *transaction = get_word(words, num_words, 1); + char *timeout_s = get_word(words, num_words, 2); + char *function = get_word(words, num_words, 3); if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) { error("Received incomplete %s (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.", - words[0], + keyword, transaction?transaction:"(unset)", timeout_s?timeout_s:"(unset)", function?function:"(unset)"); @@ -4813,7 +4816,7 @@ void *reader_main(void *arg __maybe_unused) { } } else - error("Received unknown command: %s", words[0]?words[0]:"(unset)"); + error("Received unknown command: %s", keyword?keyword:"(unset)"); } if(!s || feof(stdin) || ferror(stdin)) { diff --git a/collectors/plugins.d/plugins_d.c b/collectors/plugins.d/plugins_d.c index 0823233b41..79abc70708 100644 --- a/collectors/plugins.d/plugins_d.c +++ b/collectors/plugins.d/plugins_d.c @@ -6,7 +6,7 @@ char *plugin_directories[PLUGINSD_MAX_DIRECTORIES] = { NULL }; struct plugind *pluginsd_root = NULL; -inline int pluginsd_initialize_plugin_directories() +inline size_t pluginsd_initialize_plugin_directories() { char plugins_dirs[(FILENAME_MAX * 2) + 1]; static char *plugins_dir_list = NULL; diff --git a/collectors/plugins.d/plugins_d.h b/collectors/plugins.d/plugins_d.h index 5a7cccb5b6..c4b4830bef 100644 --- a/collectors/plugins.d/plugins_d.h +++ b/collectors/plugins.d/plugins_d.h @@ -11,6 +11,7 @@ #define PLUGINSD_STOCK_PLUGINS_DIRECTORY_PATH 0 #define PLUGINSD_KEYWORD_CHART "CHART" +#define PLUGINSD_KEYWORD_CHART_DEFINITION_END "CHART_DEFINITION_END" #define PLUGINSD_KEYWORD_DIMENSION "DIMENSION" #define PLUGINSD_KEYWORD_BEGIN "BEGIN" #define PLUGINSD_KEYWORD_SET "SET" @@ -29,12 +30,18 @@ #define PLUGINSD_KEYWORD_CONTEXT "CONTEXT" #define PLUGINSD_KEYWORD_TOMBSTONE "TOMBSTONE" #define PLUGINSD_KEYWORD_HOST "HOST" -//#define PLUGINSD_KEYWORD_GAPS_REQUEST "GAPS_REQUEST" // child -> parent -//#define PLUGINSD_KEYWORD_CHART_GAP "CHART_GAP" // parent <- child + +#define PLUGINSD_KEYWORD_REPLAY_CHART "REPLAY_CHART" +#define PLUGINSD_KEYWORD_REPLAY_BEGIN "RBEGIN" +#define PLUGINSD_KEYWORD_REPLAY_SET "RSET" +#define PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE "RDSTATE" +#define PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE "RSSTATE" +#define PLUGINSD_KEYWORD_REPLAY_END "REND" #define PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT 10 // seconds #define PLUGINSD_LINE_MAX_SSL_READ 512 + #define PLUGINSD_MAX_WORDS 20 #define PLUGINSD_MAX_DIRECTORIES 20 @@ -69,7 +76,7 @@ extern struct plugind *pluginsd_root; size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugin_input, FILE *fp_plugin_output, int trust_durations); -int pluginsd_initialize_plugin_directories(); +size_t pluginsd_initialize_plugin_directories(); diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c index 7978d5d610..3376abc840 100644 --- a/collectors/plugins.d/pluginsd_parser.c +++ b/collectors/plugins.d/pluginsd_parser.c @@ -4,10 +4,35 @@ #define LOG_FUNCTIONS false -PARSER_RC pluginsd_set(char **words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused) +static int send_to_plugin(const char *txt, void *data) { + PARSER *parser = data; + + if(!txt || !*txt) + return 0; + +#ifdef ENABLE_HTTPS + struct netdata_ssl *ssl = parser->ssl_output; + if(ssl) { + if(ssl->conn && ssl->flags == NETDATA_SSL_HANDSHAKE_COMPLETE) { + size_t size = strlen(txt); + return SSL_write(ssl->conn, txt, (int)size); + } + + error("cannot write to SSL connection - connection is not ready."); + return -1; + } +#endif + + FILE *fp = parser->output; + int ret = fprintf(fp, "%s", txt); + fflush(fp); + return ret; +} + +PARSER_RC pluginsd_set(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused) { - char *dimension = words[1]; - char *value = words[2]; + char *dimension = get_word(words, num_words, 1); + char *value = get_word(words, num_words, 2); RRDSET *st = ((PARSER_USER_OBJECT *) user)->st; RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host; @@ -47,10 +72,10 @@ disable: return PARSER_RC_ERROR; } -PARSER_RC pluginsd_begin(char **words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused) +PARSER_RC pluginsd_begin(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused) { - char *id = words[1]; - char *microseconds_txt = words[2]; + char *id = get_word(words, num_words, 1); + char *microseconds_txt = get_word(words, num_words, 2); RRDSET *st = NULL; RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host; @@ -86,9 +111,11 @@ disable: return PARSER_RC_ERROR; } -PARSER_RC pluginsd_end(char **words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused) +PARSER_RC pluginsd_end(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused) { UNUSED(words); + UNUSED(num_words); + RRDSET *st = ((PARSER_USER_OBJECT *) user)->st; RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host; @@ -107,7 +134,7 @@ PARSER_RC pluginsd_end(char **words, void *user, PLUGINSD_ACTION *plugins_actio return PARSER_RC_OK; } -PARSER_RC pluginsd_chart(char **words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused) +PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused) { RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host; if (unlikely(!host && !((PARSER_USER_OBJECT *) user)->host_exists)) { @@ -115,18 +142,18 @@ PARSER_RC pluginsd_chart(char **words, void *user, PLUGINSD_ACTION *plugins_act return PARSER_RC_OK; } - char *type = words[1]; - char *name = words[2]; - char *title = words[3]; - char *units = words[4]; - char *family = words[5]; - char *context = words[6]; - char *chart = words[7]; - char *priority_s = words[8]; - char *update_every_s = words[9]; - char *options = words[10]; - char *plugin = words[11]; - char *module = words[12]; + char *type = get_word(words, num_words, 1); + char *name = get_word(words, num_words, 2); + char *title = get_word(words, num_words, 3); + char *units = get_word(words, num_words, 4); + char *family = get_word(words, num_words, 5); + char *context = get_word(words, num_words, 6); + char *chart = get_word(words, num_words, 7); + char *priority_s = get_word(words, num_words, 8); + char *update_every_s = get_word(words, num_words, 9); + char *options = get_word(words, num_words, 10); + char *plugin = get_word(words, num_words, 11); + char *module = get_word(words, num_words, 12); // parse the id from type char *id = NULL; @@ -231,14 +258,36 @@ PARSER_RC pluginsd_chart(char **words, void *user, PLUGINSD_ACTION *plugins_act return PARSER_RC_OK; } -PARSER_RC pluginsd_dimension(char **words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused) +PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action) { - char *id = words[1]; - char *name = words[2]; - char *algorithm = words[3]; - char *multiplier_s = words[4]; - char *divisor_s = words[5]; - char *options = words[6]; + UNUSED(plugins_action); + + long first_entry_child = str2l(get_word(words, num_words, 1)); + long last_entry_child = str2l(get_word(words, num_words, 2)); + + PARSER_USER_OBJECT *user_object = (PARSER_USER_OBJECT *) user; + + RRDHOST *host = user_object->host; + RRDSET *st = user_object->st; + if(unlikely(!host || !st)) { + error("REPLAY: received " PLUGINSD_KEYWORD_CHART_DEFINITION_END " command without a chart. Disabling it."); + return PARSER_RC_ERROR; + } + + rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); + + bool ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st, first_entry_child, last_entry_child, 0, 0); + return ok ? PARSER_RC_OK : PARSER_RC_ERROR; +} + +PARSER_RC pluginsd_dimension(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused) +{ + char *id = get_word(words, num_words, 1); + char *name = get_word(words, num_words, 2); + char *algorithm = get_word(words, num_words, 3); + char *multiplier_s = get_word(words, num_words, 4); + char *divisor_s = get_word(words, num_words, 5); + char *options = get_word(words, num_words, 6); RRDSET *st = ((PARSER_USER_OBJECT *) user)->st; RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host; @@ -341,16 +390,18 @@ static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void struct inflight_function *pf = func; PARSER *parser = parser_ptr; - FILE *fp = parser->output; // leave this code as default, so that when the dictionary is destroyed this will be sent back to the caller pf->code = HTTP_RESP_GATEWAY_TIMEOUT; + char buffer[2048 + 1]; + snprintfz(buffer, 2048, "FUNCTION %s %d \"%s\"\n", + dictionary_acquired_item_name(item), + pf->timeout, + string2str(pf->function)); + // send the command to the plugin - int ret = fprintf(fp, "FUNCTION %s %d \"%s\"\n", - dictionary_acquired_item_name(item), - pf->timeout, - string2str(pf->function)); + int ret = send_to_plugin(buffer, parser); pf->sent_ut = now_realtime_usec(); @@ -359,11 +410,9 @@ static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void rrd_call_function_error(pf->destination_wb, "Failed to communicate with collector", HTTP_RESP_BACKEND_FETCH_FAILED); } else { - fflush(fp); - internal_error(LOG_FUNCTIONS, - "FUNCTION '%s' with transaction '%s' sent to collector (%d bytes, fd %d, in %llu usec)", - string2str(pf->function), dictionary_acquired_item_name(item), ret, fileno(fp), + "FUNCTION '%s' with transaction '%s' sent to collector (%d bytes, in %llu usec)", + string2str(pf->function), dictionary_acquired_item_name(item), ret, pf->sent_ut - pf->started_ut); } } @@ -461,18 +510,18 @@ static int pluginsd_execute_function_callback(BUFFER *destination_wb, int timeou return HTTP_RESP_OK; } -PARSER_RC pluginsd_function(char **words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused) +PARSER_RC pluginsd_function(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused) { bool global = false; - int i = 1; - if(strcmp(words[i], "GLOBAL") == 0) { + size_t i = 1; + if(num_words >= 2 && strcmp(get_word(words, num_words, 1), "GLOBAL") == 0) { i++; global = true; } - char *name = words[i++]; - char *timeout_s = words[i++]; - char *help = words[i++]; + char *name = get_word(words, num_words, i++); + char *timeout_s = get_word(words, num_words, i++); + char *help = get_word(words, num_words, i++); RRDSET *st = (global)?NULL:((PARSER_USER_OBJECT *) user)->st; RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host; @@ -508,12 +557,12 @@ static void pluginsd_function_result_end(struct parser *parser, void *action_dat string_freez(key); } -PARSER_RC pluginsd_function_result_begin(char **words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused) +PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused) { - char *key = words[1]; - char *status = words[2]; - char *format = words[3]; - char *expires = words[4]; + char *key = get_word(words, num_words, 1); + char *status = get_word(words, num_words, 2); + char *format = get_word(words, num_words, 3); + char *expires = get_word(words, num_words, 4); if (unlikely(!key || !*key || !status || !*status || !format || !*format || !expires || !*expires)) { error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " without providing the required data (key = '%s', status = '%s', format = '%s', expires = '%s')." @@ -564,10 +613,10 @@ PARSER_RC pluginsd_function_result_begin(char **words, void *user, PLUGINSD_ACTI // ---------------------------------------------------------------------------- -PARSER_RC pluginsd_variable(char **words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused) +PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused) { - char *name = words[1]; - char *value = words[2]; + char *name = get_word(words, num_words, 1); + char *value = get_word(words, num_words, 2); NETDATA_DOUBLE v; RRDSET *st = ((PARSER_USER_OBJECT *) user)->st; @@ -578,12 +627,12 @@ PARSER_RC pluginsd_variable(char **words, void *user, PLUGINSD_ACTION *plugins_ if (name && *name) { if ((strcmp(name, "GLOBAL") == 0 || strcmp(name, "HOST") == 0)) { global = 1; - name = words[2]; - value = words[3]; + name = get_word(words, num_words, 2); + value = get_word(words, num_words, 3); } else if ((strcmp(name, "LOCAL") == 0 || strcmp(name, "CHART") == 0)) { global = 0; - name = words[2]; - value = words[3]; + name = get_word(words, num_words, 2); + value = get_word(words, num_words, 3); } } @@ -641,69 +690,78 @@ PARSER_RC pluginsd_variable(char **words, void *user, PLUGINSD_ACTION *plugins_ return PARSER_RC_OK; } -PARSER_RC pluginsd_flush(char **words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused) +PARSER_RC pluginsd_flush(char **words __maybe_unused, size_t num_words __maybe_unused, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused) { - UNUSED(words); debug(D_PLUGINSD, "requested a FLUSH"); ((PARSER_USER_OBJECT *) user)->st = NULL; + ((PARSER_USER_OBJECT *) user)->replay.start_time = 0; + ((PARSER_USER_OBJECT *) user)->replay.end_time = 0; + ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0; + ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0; return PARSER_RC_OK; } -PARSER_RC pluginsd_disable(char **words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused) +PARSER_RC pluginsd_disable(char **words __maybe_unused, size_t num_words __maybe_unused, void *user __maybe_unused, PLUGINSD_ACTION *plugins_action __maybe_unused) { - UNUSED(user); - UNUSED(words); - info("called DISABLE. Disabling it."); ((PARSER_USER_OBJECT *) user)->enabled = 0; return PARSER_RC_ERROR; } -PARSER_RC pluginsd_label(char **words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused) +PARSER_RC pluginsd_label(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused) { - char *store; + const char *name = get_word(words, num_words, 1); + const char *label_source = get_word(words, num_words, 2); + const char *value = get_word(words, num_words, 3); - if (!words[1] || !words[2] || !words[3]) { + if (!name || !label_source || !value) { error("Ignoring malformed or empty LABEL command."); return PARSER_RC_OK; } - if (!words[4]) - store = words[3]; - else { - store = callocz(PLUGINSD_LINE_MAX + 1, sizeof(char)); + + char *store = (char *)value; + bool allocated_store = false; + + if(unlikely(num_words > 4)) { + allocated_store = true; + store = mallocz(PLUGINSD_LINE_MAX + 1); size_t remaining = PLUGINSD_LINE_MAX; char *move = store; - int i = 3; - while (i < PLUGINSD_MAX_WORDS) { - size_t length = strlen(words[i]); - if ((length + 1) >= remaining) - break; - - remaining -= (length + 1); - memcpy(move, words[i], length); + char *word; + for(size_t i = 3; i < num_words && remaining > 2 && (word = get_word(words, num_words, i)) ;i++) { + if(i > 3) { + *move++ = ' '; + *move = '\0'; + remaining--; + } + + size_t length = strlen(word); + if (length > remaining) + length = remaining; + + remaining -= length; + memcpy(move, word, length); move += length; - *move++ = ' '; - - i++; - if (!words[i]) - break; + *move = '\0'; } } if(unlikely(!((PARSER_USER_OBJECT *) user)->new_host_labels)) ((PARSER_USER_OBJECT *) user)->new_host_labels = rrdlabels_create(); - rrdlabels_add(((PARSER_USER_OBJECT *)user)->new_host_labels, words[1], store, strtol(words[2], NULL, 10)); + rrdlabels_add(((PARSER_USER_OBJECT *)user)->new_host_labels, + name, + store, + str2l(label_source)); - if (store != words[3]) + if (allocated_store) freez(store); + return PARSER_RC_OK; } -PARSER_RC pluginsd_overwrite(char **words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused) +PARSER_RC pluginsd_overwrite(char **words __maybe_unused, size_t num_words __maybe_unused, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused) { - UNUSED(words); - RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host; debug(D_PLUGINSD, "requested to OVERWRITE host labels"); @@ -719,9 +777,13 @@ PARSER_RC pluginsd_overwrite(char **words, void *user, PLUGINSD_ACTION *plugins } -PARSER_RC pluginsd_clabel(char **words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused) +PARSER_RC pluginsd_clabel(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused) { - if (!words[1] || !words[2] || !words[3]) { + const char *name = get_word(words, num_words, 1); + const char *value = get_word(words, num_words, 2); + const char *label_source = get_word(words, num_words, 3); + + if (!name || !value || !*label_source) { error("Ignoring malformed or empty CHART LABEL command."); return PARSER_RC_OK; } @@ -731,15 +793,14 @@ PARSER_RC pluginsd_clabel(char **words, void *user, PLUGINSD_ACTION *plugins_ac rrdlabels_unmark_all(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily); } - rrdlabels_add(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily, words[1], words[2], strtol(words[3], NULL, 10)); + rrdlabels_add(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily, + name, value, str2l(label_source)); return PARSER_RC_OK; } -PARSER_RC pluginsd_clabel_commit(char **words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused) +PARSER_RC pluginsd_clabel_commit(char **words __maybe_unused, size_t num_words __maybe_unused, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused) { - UNUSED(words); - RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host; RRDSET *st = ((PARSER_USER_OBJECT *)user)->st; @@ -762,9 +823,9 @@ PARSER_RC pluginsd_clabel_commit(char **words, void *user, PLUGINSD_ACTION *plu return PARSER_RC_OK; } -PARSER_RC pluginsd_guid(char **words, void *user, PLUGINSD_ACTION *plugins_action) +PARSER_RC pluginsd_guid(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action) { - char *uuid_str = words[1]; + char *uuid_str = get_word(words, num_words, 1); uuid_t uuid; if (unlikely(!uuid_str)) { @@ -784,9 +845,9 @@ PARSER_RC pluginsd_guid(char **words, void *user, PLUGINSD_ACTION *plugins_actio return PARSER_RC_OK; } -PARSER_RC pluginsd_context(char **words, void *user, PLUGINSD_ACTION *plugins_action) +PARSER_RC pluginsd_context(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action) { - char *uuid_str = words[1]; + char *uuid_str = get_word(words, num_words, 1); uuid_t uuid; if (unlikely(!uuid_str)) { @@ -806,9 +867,9 @@ PARSER_RC pluginsd_context(char **words, void *user, PLUGINSD_ACTION *plugins_ac return PARSER_RC_OK; } -PARSER_RC pluginsd_tombstone(char **words, void *user, PLUGINSD_ACTION *plugins_action) +PARSER_RC pluginsd_tombstone(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action) { - char *uuid_str = words[1]; + char *uuid_str = get_word(words, num_words, 1); uuid_t uuid; if (unlikely(!uuid_str)) { @@ -828,15 +889,15 @@ PARSER_RC pluginsd_tombstone(char **words, void *user, PLUGINSD_ACTION *plugins_ return PARSER_RC_OK; } -PARSER_RC metalog_pluginsd_host(char **words, void *user, PLUGINSD_ACTION *plugins_action) +PARSER_RC metalog_pluginsd_host(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action) { - char *machine_guid = words[1]; - char *hostname = words[2]; - char *registry_hostname = words[3]; - char *update_every_s = words[4]; - char *os = words[5]; - char *timezone = words[6]; - char *tags = words[7]; + char *machine_guid = get_word(words, num_words, 1); + char *hostname = get_word(words, num_words, 2); + char *registry_hostname = get_word(words, num_words, 3); + char *update_every_s = get_word(words, num_words, 4); + char *os = get_word(words, num_words, 5); + char *timezone = get_word(words, num_words, 6); + char *tags = get_word(words, num_words, 7); int update_every = 1; if (likely(update_every_s && *update_every_s)) @@ -855,6 +916,296 @@ PARSER_RC metalog_pluginsd_host(char **words, void *user, PLUGINSD_ACTION *plug return PARSER_RC_OK; } +PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused) +{ + char *id = get_word(words, num_words, 1); + char *start_time_str = get_word(words, num_words, 2); + char *end_time_str = get_word(words, num_words, 3); + + RRDSET *st = ((PARSER_USER_OBJECT *) user)->st; + RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host; + + if (unlikely(!id || (!st && !*id))) { + error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_BEGIN " without a chart id for host '%s'. Disabling it.", rrdhost_hostname(host)); + goto disable; + } + + if(*id) { + st = rrdset_find(host, id); + if (unlikely(!st)) { + error("requested a " PLUGINSD_KEYWORD_REPLAY_BEGIN " on chart '%s', which does not exist on host '%s'. Disabling it.", + id, rrdhost_hostname(host)); + goto disable; + } + + ((PARSER_USER_OBJECT *) user)->st = st; + ((PARSER_USER_OBJECT *) user)->replay.start_time = 0; + ((PARSER_USER_OBJECT *) user)->replay.end_time = 0; + ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0; + ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0; + } + + if(start_time_str && end_time_str) { + time_t start_time = strtol(start_time_str, NULL, 0); + time_t end_time = strtol(end_time_str, NULL, 0); + + if(start_time && end_time) { + if (start_time > end_time) { + error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_BEGIN " on chart '%s' ('%s') on host '%s', but timings are invalid (%ld to %ld). Disabling it.", + rrdset_name(st), rrdset_id(st), rrdhost_hostname(st->rrdhost), start_time, end_time); + goto disable; + } + + if (end_time - start_time != st->update_every) + rrdset_set_update_every(st, end_time - start_time); + + st->last_collected_time.tv_sec = end_time; + st->last_collected_time.tv_usec = 0; + + st->last_updated.tv_sec = end_time; + st->last_updated.tv_usec = 0; + + ((PARSER_USER_OBJECT *) user)->replay.start_time = start_time; + ((PARSER_USER_OBJECT *) user)->replay.end_time = end_time; + ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = (usec_t) start_time * USEC_PER_SEC; + ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = (usec_t) end_time * USEC_PER_SEC; + + st->counter++; + st->counter_done++; + + // these are only needed for db mode RAM, SAVE, MAP, ALLOC + st->current_entry++; + if(st->current_entry >= st->entries) + st->current_entry -= st->entries; + } + } + + return PARSER_RC_OK; + +disable: + ((PARSER_USER_OBJECT *)user)->enabled = 0; + return PARSER_RC_ERROR; +} + +PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused) +{ + char *dimension = get_word(words, num_words, 1); + char *value_str = get_word(words, num_words, 2); + char *flags_str = get_word(words, num_words, 3); + + RRDSET *st = ((PARSER_USER_OBJECT *) user)->st; + RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host; + + if (unlikely(!st)) { + error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_SET " on dimension '%s' on host '%s', without a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.", |