summaryrefslogtreecommitdiffstats
path: root/collectors
diff options
context:
space:
mode:
authorvkalintiris <vasilis@netdata.cloud>2022-10-31 19:53:20 +0200
committerGitHub <noreply@github.com>2022-10-31 19:53:20 +0200
commit282e0dfaa97289cc6542742e9e389bd76b7e4164 (patch)
treeb23e108b35adc8ed322e8167d0f1fe607c2cfa4c /collectors
parentdf87a538cfaba5014a752937714756b7c5d30c93 (diff)
Replication of metrics (gaps filling) during streaming (#13873)
* Revert "Use llvm's ar and ranlib when compiling with clang (#13854)" This reverts commit a9135f47bbb36e9cb437b18a7109607569580db7. * Profile plugin * Fix macos static thread * Add support for replication - Add a new capability for replication, when not supported the agent should behave as previously. - When replication is supported, the text protocol supports the following new commands: - CHART_DEFINITION_END: send the first/last entry of the child - REPLAY_RRDSET_BEGIN: sends the name of the chart we are replicating - REPLAY_RRDSET_HEADER: sends a line describing the columns of the following command (ie. start-time, end-time, dim1-name, ...) - REPLAY_RRDSET_DONE: sends values to push for a specific start/end time - REPLAY_RRDSET_END: send the (a) update every of the chart, (b) first/last entries in DB, (c) whether the child's been told to start streaming, (d) original after/before period to replicate. - REPLAY_CHART: Sent from a parent to a child, specifying (a) the chart name we want data for, (b) whether the child should start streaming once it has fullfilled the request with the aforementioned commands, (c) after/before of the data the parent wants - As a consequence of the new protocol, streaming is disabled for all charts on a new connection. It's enabled once replication is finished. - The configuration parameters are specified from within stream.conf: - "enable replication = yes|no" - "seconds to replicate = 3600" - "replication step = 600" (ie. how many seconds to fill per roundtrip request. * Minor fixes - quote set and dim ids - start streaming after writing replicated data to the buffer - write replicated data only when buffer is less than 50% full. - use reentrant iteration for charts * Do not send chart definitions on connection. * Track replication status through rrdset flags. * Add debug flag for noisy log messages. * Add license notice. * Iterate charts with reentrant loop * Set replication finished flag when streaming is disabled. * Revert "Profile plugin" This reverts commit 468fc9386e5283e0865fae56e9989b8ec83de14d. Used only for testing purposes. * Revert "Revert "Use llvm's ar and ranlib when compiling with clang (#13854)"" This reverts commit 27c955c58d95aed6c44d42e8b675f0cf3ca45c6d. Reapply commit that I had to revert in order to be able to build the agent on MacOS. * Build replication source files with CMake. * Pass number of words in plugind functions. * Use get_word instead of indexing words. * Use size_t instead of int. * Pay only what we use when splitting words. * no need to redefine PLUGINSD_MAX_WORDS * fix formatting warning * all usages of pluginsd_split_words() should use the return value to ensure non-cached results reuse; no need to lock the host to find a chart * keep a sender dictionary with all the replication commands received and remove replication commands from charts * do not replicate future data * use last_updated to find the end of the db * uniformity of replication logs * rewrite of the query logic * replication.c in C; debug info in human readable dates * update the chart on every replication row * update all chart members so that rrdset_done() can continue * update the protocol to push one dimension per line and transfer data collection state to parent * fix formatting * remove replication object from pluginsd * shorter communication * fix typo * support for replication proxies * proper use of flags * set receiver replication finished flag on charts created after the sender has been connected * clear RRDSET_FLAG_SYNC_CLOCK on replicated charts * log storing of nulls * log first store * log update every switches * test ignoring timestamps but sending a point just after replication end * replication should work on end_time * use replicated timestamps * at the final replication step, replicate all the remaining points * cleanup code from tests * print timestamps as unsigned long long * more formating changes; fix conflicting type of replicate_chart_response() * updated stream.conf * always respond to replication requests * in non-dbengine db modes, do not replicate more than the database size * advance the db pointer of legacy db modes * should be multiplied by update_every * fix buggy label parsing - identified by codacy * dont log error on history mismatches for db mode dbengine * allow SSL requests to streaming children * dont use ssl variable Co-authored-by: Costa Tsaousis <costa@netdata.cloud>
Diffstat (limited to 'collectors')
-rw-r--r--collectors/apps.plugin/apps_plugin.c53
-rw-r--r--collectors/plugins.d/plugins_d.c2
-rw-r--r--collectors/plugins.d/plugins_d.h13
-rw-r--r--collectors/plugins.d/pluginsd_parser.c563
-rw-r--r--collectors/plugins.d/pluginsd_parser.h16
-rw-r--r--collectors/statsd.plugin/statsd.c18
-rw-r--r--collectors/tc.plugin/plugin_tc.c2
7 files changed, 516 insertions, 151 deletions
diff --git a/collectors/apps.plugin/apps_plugin.c b/collectors/apps.plugin/apps_plugin.c
index 858bef2c50..084afeb3d6 100644
--- a/collectors/apps.plugin/apps_plugin.c
+++ b/collectors/apps.plugin/apps_plugin.c
@@ -4311,7 +4311,7 @@ static void apps_plugin_function_processes(const char *transaction, char *functi
struct pid_stat *p;
char *words[PLUGINSD_MAX_WORDS] = { NULL };
- pluginsd_split_words(function, words, PLUGINSD_MAX_WORDS, NULL, NULL, 0);
+ size_t num_words = pluginsd_split_words(function, words, PLUGINSD_MAX_WORDS, NULL, NULL, 0);
struct target *category = NULL, *user = NULL, *group = NULL;
const char *process_name = NULL;
@@ -4322,51 +4322,52 @@ static void apps_plugin_function_processes(const char *transaction, char *functi
bool filter_pid = false, filter_uid = false, filter_gid = false;
for(int i = 1; i < PLUGINSD_MAX_WORDS ;i++) {
- if(!words[i]) break;
+ const char *keyword = get_word(words, num_words, i);
+ if(!keyword) break;
- if(!category && strncmp(words[i], PROCESS_FILTER_CATEGORY, strlen(PROCESS_FILTER_CATEGORY)) == 0) {
- category = find_target_by_name(apps_groups_root_target, &words[i][strlen(PROCESS_FILTER_CATEGORY)]);
+ if(!category && strncmp(keyword, PROCESS_FILTER_CATEGORY, strlen(PROCESS_FILTER_CATEGORY)) == 0) {
+ category = find_target_by_name(apps_groups_root_target, &keyword[strlen(PROCESS_FILTER_CATEGORY)]);
if(!category) {
apps_plugin_function_error(transaction, HTTP_RESP_BAD_REQUEST, "No category with that name found.");
return;
}
}
- else if(!user && strncmp(words[i], PROCESS_FILTER_USER, strlen(PROCESS_FILTER_USER)) == 0) {
- user = find_target_by_name(users_root_target, &words[i][strlen(PROCESS_FILTER_USER)]);
+ else if(!user && strncmp(keyword, PROCESS_FILTER_USER, strlen(PROCESS_FILTER_USER)) == 0) {
+ user = find_target_by_name(users_root_target, &keyword[strlen(PROCESS_FILTER_USER)]);
if(!user) {
apps_plugin_function_error(transaction, HTTP_RESP_BAD_REQUEST, "No user with that name found.");
return;
}
}
- else if(strncmp(words[i], PROCESS_FILTER_GROUP, strlen(PROCESS_FILTER_GROUP)) == 0) {
- group = find_target_by_name(groups_root_target, &words[i][strlen(PROCESS_FILTER_GROUP)]);
+ else if(strncmp(keyword, PROCESS_FILTER_GROUP, strlen(PROCESS_FILTER_GROUP)) == 0) {
+ group = find_target_by_name(groups_root_target, &keyword[strlen(PROCESS_FILTER_GROUP)]);
if(!group) {
apps_plugin_function_error(transaction, HTTP_RESP_BAD_REQUEST, "No group with that name found.");
return;
}
}
- else if(!process_name && strncmp(words[i], PROCESS_FILTER_PROCESS, strlen(PROCESS_FILTER_PROCESS)) == 0) {
- process_name = &words[i][strlen(PROCESS_FILTER_PROCESS)];
+ else if(!process_name && strncmp(keyword, PROCESS_FILTER_PROCESS, strlen(PROCESS_FILTER_PROCESS)) == 0) {
+ process_name = &keyword[strlen(PROCESS_FILTER_PROCESS)];
}
- else if(!pid && strncmp(words[i], PROCESS_FILTER_PID, strlen(PROCESS_FILTER_PID)) == 0) {
- pid = str2i(&words[i][strlen(PROCESS_FILTER_PID)]);
+ else if(!pid && strncmp(keyword, PROCESS_FILTER_PID, strlen(PROCESS_FILTER_PID)) == 0) {
+ pid = str2i(&keyword[strlen(PROCESS_FILTER_PID)]);
filter_pid = true;
}
- else if(!uid && strncmp(words[i], PROCESS_FILTER_UID, strlen(PROCESS_FILTER_UID)) == 0) {
- uid = str2i(&words[i][strlen(PROCESS_FILTER_UID)]);
+ else if(!uid && strncmp(keyword, PROCESS_FILTER_UID, strlen(PROCESS_FILTER_UID)) == 0) {
+ uid = str2i(&keyword[strlen(PROCESS_FILTER_UID)]);
filter_uid = true;
}
- else if(!gid && strncmp(words[i], PROCESS_FILTER_GID, strlen(PROCESS_FILTER_GID)) == 0) {
- gid = str2i(&words[i][strlen(PROCESS_FILTER_GID)]);
+ else if(!gid && strncmp(keyword, PROCESS_FILTER_GID, strlen(PROCESS_FILTER_GID)) == 0) {
+ gid = str2i(&keyword[strlen(PROCESS_FILTER_GID)]);
filter_gid = true;
}
- else if(strcmp(words[i], "help") == 0) {
+ else if(strcmp(keyword, "help") == 0) {
apps_plugin_function_processes_help(transaction);
return;
}
else {
char msg[PLUGINSD_LINE_MAX];
- snprintfz(msg, PLUGINSD_LINE_MAX, "Invalid parameter '%s'", words[i]);
+ snprintfz(msg, PLUGINSD_LINE_MAX, "Invalid parameter '%s'", keyword);
apps_plugin_function_error(transaction, HTTP_RESP_BAD_REQUEST, msg);
return;
}
@@ -4779,16 +4780,18 @@ void *reader_main(void *arg __maybe_unused) {
while(!apps_plugin_exit && (s = fgets(buffer, PLUGINSD_LINE_MAX, stdin))) {
char *words[PLUGINSD_MAX_WORDS] = { NULL };
- pluginsd_split_words(buffer, words, PLUGINSD_MAX_WORDS, NULL, NULL, 0);
+ size_t num_words = pluginsd_split_words(buffer, words, PLUGINSD_MAX_WORDS, NULL, NULL, 0);
- if(words[0] && strcmp(words[0], PLUGINSD_KEYWORD_FUNCTION) == 0) {
- char *transaction = words[1];
- char *timeout_s = words[2];
- char *function = words[3];
+ const char *keyword = get_word(words, num_words, 0);
+
+ if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0) {
+ char *transaction = get_word(words, num_words, 1);
+ char *timeout_s = get_word(words, num_words, 2);
+ char *function = get_word(words, num_words, 3);
if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
error("Received incomplete %s (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
- words[0],
+ keyword,
transaction?transaction:"(unset)",
timeout_s?timeout_s:"(unset)",
function?function:"(unset)");
@@ -4813,7 +4816,7 @@ void *reader_main(void *arg __maybe_unused) {
}
}
else
- error("Received unknown command: %s", words[0]?words[0]:"(unset)");
+ error("Received unknown command: %s", keyword?keyword:"(unset)");
}
if(!s || feof(stdin) || ferror(stdin)) {
diff --git a/collectors/plugins.d/plugins_d.c b/collectors/plugins.d/plugins_d.c
index 0823233b41..79abc70708 100644
--- a/collectors/plugins.d/plugins_d.c
+++ b/collectors/plugins.d/plugins_d.c
@@ -6,7 +6,7 @@
char *plugin_directories[PLUGINSD_MAX_DIRECTORIES] = { NULL };
struct plugind *pluginsd_root = NULL;
-inline int pluginsd_initialize_plugin_directories()
+inline size_t pluginsd_initialize_plugin_directories()
{
char plugins_dirs[(FILENAME_MAX * 2) + 1];
static char *plugins_dir_list = NULL;
diff --git a/collectors/plugins.d/plugins_d.h b/collectors/plugins.d/plugins_d.h
index 5a7cccb5b6..c4b4830bef 100644
--- a/collectors/plugins.d/plugins_d.h
+++ b/collectors/plugins.d/plugins_d.h
@@ -11,6 +11,7 @@
#define PLUGINSD_STOCK_PLUGINS_DIRECTORY_PATH 0
#define PLUGINSD_KEYWORD_CHART "CHART"
+#define PLUGINSD_KEYWORD_CHART_DEFINITION_END "CHART_DEFINITION_END"
#define PLUGINSD_KEYWORD_DIMENSION "DIMENSION"
#define PLUGINSD_KEYWORD_BEGIN "BEGIN"
#define PLUGINSD_KEYWORD_SET "SET"
@@ -29,12 +30,18 @@
#define PLUGINSD_KEYWORD_CONTEXT "CONTEXT"
#define PLUGINSD_KEYWORD_TOMBSTONE "TOMBSTONE"
#define PLUGINSD_KEYWORD_HOST "HOST"
-//#define PLUGINSD_KEYWORD_GAPS_REQUEST "GAPS_REQUEST" // child -> parent
-//#define PLUGINSD_KEYWORD_CHART_GAP "CHART_GAP" // parent <- child
+
+#define PLUGINSD_KEYWORD_REPLAY_CHART "REPLAY_CHART"
+#define PLUGINSD_KEYWORD_REPLAY_BEGIN "RBEGIN"
+#define PLUGINSD_KEYWORD_REPLAY_SET "RSET"
+#define PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE "RDSTATE"
+#define PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE "RSSTATE"
+#define PLUGINSD_KEYWORD_REPLAY_END "REND"
#define PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT 10 // seconds
#define PLUGINSD_LINE_MAX_SSL_READ 512
+
#define PLUGINSD_MAX_WORDS 20
#define PLUGINSD_MAX_DIRECTORIES 20
@@ -69,7 +76,7 @@ extern struct plugind *pluginsd_root;
size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugin_input, FILE *fp_plugin_output, int trust_durations);
-int pluginsd_initialize_plugin_directories();
+size_t pluginsd_initialize_plugin_directories();
diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c
index 7978d5d610..3376abc840 100644
--- a/collectors/plugins.d/pluginsd_parser.c
+++ b/collectors/plugins.d/pluginsd_parser.c
@@ -4,10 +4,35 @@
#define LOG_FUNCTIONS false
-PARSER_RC pluginsd_set(char **words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
+static int send_to_plugin(const char *txt, void *data) {
+ PARSER *parser = data;
+
+ if(!txt || !*txt)
+ return 0;
+
+#ifdef ENABLE_HTTPS
+ struct netdata_ssl *ssl = parser->ssl_output;
+ if(ssl) {
+ if(ssl->conn && ssl->flags == NETDATA_SSL_HANDSHAKE_COMPLETE) {
+ size_t size = strlen(txt);
+ return SSL_write(ssl->conn, txt, (int)size);
+ }
+
+ error("cannot write to SSL connection - connection is not ready.");
+ return -1;
+ }
+#endif
+
+ FILE *fp = parser->output;
+ int ret = fprintf(fp, "%s", txt);
+ fflush(fp);
+ return ret;
+}
+
+PARSER_RC pluginsd_set(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
{
- char *dimension = words[1];
- char *value = words[2];
+ char *dimension = get_word(words, num_words, 1);
+ char *value = get_word(words, num_words, 2);
RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
@@ -47,10 +72,10 @@ disable:
return PARSER_RC_ERROR;
}
-PARSER_RC pluginsd_begin(char **words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
+PARSER_RC pluginsd_begin(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
{
- char *id = words[1];
- char *microseconds_txt = words[2];
+ char *id = get_word(words, num_words, 1);
+ char *microseconds_txt = get_word(words, num_words, 2);
RRDSET *st = NULL;
RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host;
@@ -86,9 +111,11 @@ disable:
return PARSER_RC_ERROR;
}
-PARSER_RC pluginsd_end(char **words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
+PARSER_RC pluginsd_end(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
{
UNUSED(words);
+ UNUSED(num_words);
+
RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
@@ -107,7 +134,7 @@ PARSER_RC pluginsd_end(char **words, void *user, PLUGINSD_ACTION *plugins_actio
return PARSER_RC_OK;
}
-PARSER_RC pluginsd_chart(char **words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
+PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
{
RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
if (unlikely(!host && !((PARSER_USER_OBJECT *) user)->host_exists)) {
@@ -115,18 +142,18 @@ PARSER_RC pluginsd_chart(char **words, void *user, PLUGINSD_ACTION *plugins_act
return PARSER_RC_OK;
}
- char *type = words[1];
- char *name = words[2];
- char *title = words[3];
- char *units = words[4];
- char *family = words[5];
- char *context = words[6];
- char *chart = words[7];
- char *priority_s = words[8];
- char *update_every_s = words[9];
- char *options = words[10];
- char *plugin = words[11];
- char *module = words[12];
+ char *type = get_word(words, num_words, 1);
+ char *name = get_word(words, num_words, 2);
+ char *title = get_word(words, num_words, 3);
+ char *units = get_word(words, num_words, 4);
+ char *family = get_word(words, num_words, 5);
+ char *context = get_word(words, num_words, 6);
+ char *chart = get_word(words, num_words, 7);
+ char *priority_s = get_word(words, num_words, 8);
+ char *update_every_s = get_word(words, num_words, 9);
+ char *options = get_word(words, num_words, 10);
+ char *plugin = get_word(words, num_words, 11);
+ char *module = get_word(words, num_words, 12);
// parse the id from type
char *id = NULL;
@@ -231,14 +258,36 @@ PARSER_RC pluginsd_chart(char **words, void *user, PLUGINSD_ACTION *plugins_act
return PARSER_RC_OK;
}
-PARSER_RC pluginsd_dimension(char **words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
+PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action)
{
- char *id = words[1];
- char *name = words[2];
- char *algorithm = words[3];
- char *multiplier_s = words[4];
- char *divisor_s = words[5];
- char *options = words[6];
+ UNUSED(plugins_action);
+
+ long first_entry_child = str2l(get_word(words, num_words, 1));
+ long last_entry_child = str2l(get_word(words, num_words, 2));
+
+ PARSER_USER_OBJECT *user_object = (PARSER_USER_OBJECT *) user;
+
+ RRDHOST *host = user_object->host;
+ RRDSET *st = user_object->st;
+ if(unlikely(!host || !st)) {
+ error("REPLAY: received " PLUGINSD_KEYWORD_CHART_DEFINITION_END " command without a chart. Disabling it.");
+ return PARSER_RC_ERROR;
+ }
+
+ rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
+
+ bool ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st, first_entry_child, last_entry_child, 0, 0);
+ return ok ? PARSER_RC_OK : PARSER_RC_ERROR;
+}
+
+PARSER_RC pluginsd_dimension(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
+{
+ char *id = get_word(words, num_words, 1);
+ char *name = get_word(words, num_words, 2);
+ char *algorithm = get_word(words, num_words, 3);
+ char *multiplier_s = get_word(words, num_words, 4);
+ char *divisor_s = get_word(words, num_words, 5);
+ char *options = get_word(words, num_words, 6);
RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
@@ -341,16 +390,18 @@ static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void
struct inflight_function *pf = func;
PARSER *parser = parser_ptr;
- FILE *fp = parser->output;
// leave this code as default, so that when the dictionary is destroyed this will be sent back to the caller
pf->code = HTTP_RESP_GATEWAY_TIMEOUT;
+ char buffer[2048 + 1];
+ snprintfz(buffer, 2048, "FUNCTION %s %d \"%s\"\n",
+ dictionary_acquired_item_name(item),
+ pf->timeout,
+ string2str(pf->function));
+
// send the command to the plugin
- int ret = fprintf(fp, "FUNCTION %s %d \"%s\"\n",
- dictionary_acquired_item_name(item),
- pf->timeout,
- string2str(pf->function));
+ int ret = send_to_plugin(buffer, parser);
pf->sent_ut = now_realtime_usec();
@@ -359,11 +410,9 @@ static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void
rrd_call_function_error(pf->destination_wb, "Failed to communicate with collector", HTTP_RESP_BACKEND_FETCH_FAILED);
}
else {
- fflush(fp);
-
internal_error(LOG_FUNCTIONS,
- "FUNCTION '%s' with transaction '%s' sent to collector (%d bytes, fd %d, in %llu usec)",
- string2str(pf->function), dictionary_acquired_item_name(item), ret, fileno(fp),
+ "FUNCTION '%s' with transaction '%s' sent to collector (%d bytes, in %llu usec)",
+ string2str(pf->function), dictionary_acquired_item_name(item), ret,
pf->sent_ut - pf->started_ut);
}
}
@@ -461,18 +510,18 @@ static int pluginsd_execute_function_callback(BUFFER *destination_wb, int timeou
return HTTP_RESP_OK;
}
-PARSER_RC pluginsd_function(char **words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
+PARSER_RC pluginsd_function(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
{
bool global = false;
- int i = 1;
- if(strcmp(words[i], "GLOBAL") == 0) {
+ size_t i = 1;
+ if(num_words >= 2 && strcmp(get_word(words, num_words, 1), "GLOBAL") == 0) {
i++;
global = true;
}
- char *name = words[i++];
- char *timeout_s = words[i++];
- char *help = words[i++];
+ char *name = get_word(words, num_words, i++);
+ char *timeout_s = get_word(words, num_words, i++);
+ char *help = get_word(words, num_words, i++);
RRDSET *st = (global)?NULL:((PARSER_USER_OBJECT *) user)->st;
RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
@@ -508,12 +557,12 @@ static void pluginsd_function_result_end(struct parser *parser, void *action_dat
string_freez(key);
}
-PARSER_RC pluginsd_function_result_begin(char **words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
+PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
{
- char *key = words[1];
- char *status = words[2];
- char *format = words[3];
- char *expires = words[4];
+ char *key = get_word(words, num_words, 1);
+ char *status = get_word(words, num_words, 2);
+ char *format = get_word(words, num_words, 3);
+ char *expires = get_word(words, num_words, 4);
if (unlikely(!key || !*key || !status || !*status || !format || !*format || !expires || !*expires)) {
error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " without providing the required data (key = '%s', status = '%s', format = '%s', expires = '%s')."
@@ -564,10 +613,10 @@ PARSER_RC pluginsd_function_result_begin(char **words, void *user, PLUGINSD_ACTI
// ----------------------------------------------------------------------------
-PARSER_RC pluginsd_variable(char **words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
+PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
{
- char *name = words[1];
- char *value = words[2];
+ char *name = get_word(words, num_words, 1);
+ char *value = get_word(words, num_words, 2);
NETDATA_DOUBLE v;
RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
@@ -578,12 +627,12 @@ PARSER_RC pluginsd_variable(char **words, void *user, PLUGINSD_ACTION *plugins_
if (name && *name) {
if ((strcmp(name, "GLOBAL") == 0 || strcmp(name, "HOST") == 0)) {
global = 1;
- name = words[2];
- value = words[3];
+ name = get_word(words, num_words, 2);
+ value = get_word(words, num_words, 3);
} else if ((strcmp(name, "LOCAL") == 0 || strcmp(name, "CHART") == 0)) {
global = 0;
- name = words[2];
- value = words[3];
+ name = get_word(words, num_words, 2);
+ value = get_word(words, num_words, 3);
}
}
@@ -641,69 +690,78 @@ PARSER_RC pluginsd_variable(char **words, void *user, PLUGINSD_ACTION *plugins_
return PARSER_RC_OK;
}
-PARSER_RC pluginsd_flush(char **words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
+PARSER_RC pluginsd_flush(char **words __maybe_unused, size_t num_words __maybe_unused, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
{
- UNUSED(words);
debug(D_PLUGINSD, "requested a FLUSH");
((PARSER_USER_OBJECT *) user)->st = NULL;
+ ((PARSER_USER_OBJECT *) user)->replay.start_time = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.end_time = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0;
return PARSER_RC_OK;
}
-PARSER_RC pluginsd_disable(char **words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
+PARSER_RC pluginsd_disable(char **words __maybe_unused, size_t num_words __maybe_unused, void *user __maybe_unused, PLUGINSD_ACTION *plugins_action __maybe_unused)
{
- UNUSED(user);
- UNUSED(words);
-
info("called DISABLE. Disabling it.");
((PARSER_USER_OBJECT *) user)->enabled = 0;
return PARSER_RC_ERROR;
}
-PARSER_RC pluginsd_label(char **words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
+PARSER_RC pluginsd_label(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
{
- char *store;
+ const char *name = get_word(words, num_words, 1);
+ const char *label_source = get_word(words, num_words, 2);
+ const char *value = get_word(words, num_words, 3);
- if (!words[1] || !words[2] || !words[3]) {
+ if (!name || !label_source || !value) {
error("Ignoring malformed or empty LABEL command.");
return PARSER_RC_OK;
}
- if (!words[4])
- store = words[3];
- else {
- store = callocz(PLUGINSD_LINE_MAX + 1, sizeof(char));
+
+ char *store = (char *)value;
+ bool allocated_store = false;
+
+ if(unlikely(num_words > 4)) {
+ allocated_store = true;
+ store = mallocz(PLUGINSD_LINE_MAX + 1);
size_t remaining = PLUGINSD_LINE_MAX;
char *move = store;
- int i = 3;
- while (i < PLUGINSD_MAX_WORDS) {
- size_t length = strlen(words[i]);
- if ((length + 1) >= remaining)
- break;
-
- remaining -= (length + 1);
- memcpy(move, words[i], length);
+ char *word;
+ for(size_t i = 3; i < num_words && remaining > 2 && (word = get_word(words, num_words, i)) ;i++) {
+ if(i > 3) {
+ *move++ = ' ';
+ *move = '\0';
+ remaining--;
+ }
+
+ size_t length = strlen(word);
+ if (length > remaining)
+ length = remaining;
+
+ remaining -= length;
+ memcpy(move, word, length);
move += length;
- *move++ = ' ';
-
- i++;
- if (!words[i])
- break;
+ *move = '\0';
}
}
if(unlikely(!((PARSER_USER_OBJECT *) user)->new_host_labels))
((PARSER_USER_OBJECT *) user)->new_host_labels = rrdlabels_create();
- rrdlabels_add(((PARSER_USER_OBJECT *)user)->new_host_labels, words[1], store, strtol(words[2], NULL, 10));
+ rrdlabels_add(((PARSER_USER_OBJECT *)user)->new_host_labels,
+ name,
+ store,
+ str2l(label_source));
- if (store != words[3])
+ if (allocated_store)
freez(store);
+
return PARSER_RC_OK;
}
-PARSER_RC pluginsd_overwrite(char **words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
+PARSER_RC pluginsd_overwrite(char **words __maybe_unused, size_t num_words __maybe_unused, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
{
- UNUSED(words);
-
RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
debug(D_PLUGINSD, "requested to OVERWRITE host labels");
@@ -719,9 +777,13 @@ PARSER_RC pluginsd_overwrite(char **words, void *user, PLUGINSD_ACTION *plugins
}
-PARSER_RC pluginsd_clabel(char **words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
+PARSER_RC pluginsd_clabel(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
{
- if (!words[1] || !words[2] || !words[3]) {
+ const char *name = get_word(words, num_words, 1);
+ const char *value = get_word(words, num_words, 2);
+ const char *label_source = get_word(words, num_words, 3);
+
+ if (!name || !value || !*label_source) {
error("Ignoring malformed or empty CHART LABEL command.");
return PARSER_RC_OK;
}
@@ -731,15 +793,14 @@ PARSER_RC pluginsd_clabel(char **words, void *user, PLUGINSD_ACTION *plugins_ac
rrdlabels_unmark_all(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily);
}
- rrdlabels_add(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily, words[1], words[2], strtol(words[3], NULL, 10));
+ rrdlabels_add(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily,
+ name, value, str2l(label_source));
return PARSER_RC_OK;
}
-PARSER_RC pluginsd_clabel_commit(char **words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
+PARSER_RC pluginsd_clabel_commit(char **words __maybe_unused, size_t num_words __maybe_unused, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
{
- UNUSED(words);
-
RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
RRDSET *st = ((PARSER_USER_OBJECT *)user)->st;
@@ -762,9 +823,9 @@ PARSER_RC pluginsd_clabel_commit(char **words, void *user, PLUGINSD_ACTION *plu
return PARSER_RC_OK;
}
-PARSER_RC pluginsd_guid(char **words, void *user, PLUGINSD_ACTION *plugins_action)
+PARSER_RC pluginsd_guid(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action)
{
- char *uuid_str = words[1];
+ char *uuid_str = get_word(words, num_words, 1);
uuid_t uuid;
if (unlikely(!uuid_str)) {
@@ -784,9 +845,9 @@ PARSER_RC pluginsd_guid(char **words, void *user, PLUGINSD_ACTION *plugins_actio
return PARSER_RC_OK;
}
-PARSER_RC pluginsd_context(char **words, void *user, PLUGINSD_ACTION *plugins_action)
+PARSER_RC pluginsd_context(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action)
{
- char *uuid_str = words[1];
+ char *uuid_str = get_word(words, num_words, 1);
uuid_t uuid;
if (unlikely(!uuid_str)) {
@@ -806,9 +867,9 @@ PARSER_RC pluginsd_context(char **words, void *user, PLUGINSD_ACTION *plugins_ac
return PARSER_RC_OK;
}
-PARSER_RC pluginsd_tombstone(char **words, void *user, PLUGINSD_ACTION *plugins_action)
+PARSER_RC pluginsd_tombstone(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action)
{
- char *uuid_str = words[1];
+ char *uuid_str = get_word(words, num_words, 1);
uuid_t uuid;
if (unlikely(!uuid_str)) {
@@ -828,15 +889,15 @@ PARSER_RC pluginsd_tombstone(char **words, void *user, PLUGINSD_ACTION *plugins_
return PARSER_RC_OK;
}
-PARSER_RC metalog_pluginsd_host(char **words, void *user, PLUGINSD_ACTION *plugins_action)
+PARSER_RC metalog_pluginsd_host(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action)
{
- char *machine_guid = words[1];
- char *hostname = words[2];
- char *registry_hostname = words[3];
- char *update_every_s = words[4];
- char *os = words[5];
- char *timezone = words[6];
- char *tags = words[7];
+ char *machine_guid = get_word(words, num_words, 1);
+ char *hostname = get_word(words, num_words, 2);
+ char *registry_hostname = get_word(words, num_words, 3);
+ char *update_every_s = get_word(words, num_words, 4);
+ char *os = get_word(words, num_words, 5);
+ char *timezone = get_word(words, num_words, 6);
+ char *tags = get_word(words, num_words, 7);
int update_every = 1;
if (likely(update_every_s && *update_every_s))
@@ -855,6 +916,296 @@ PARSER_RC metalog_pluginsd_host(char **words, void *user, PLUGINSD_ACTION *plug
return PARSER_RC_OK;
}
+PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
+{
+ char *id = get_word(words, num_words, 1);
+ char *start_time_str = get_word(words, num_words, 2);
+ char *end_time_str = get_word(words, num_words, 3);
+
+ RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
+ RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host;
+
+ if (unlikely(!id || (!st && !*id))) {
+ error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_BEGIN " without a chart id for host '%s'. Disabling it.", rrdhost_hostname(host));
+ goto disable;
+ }
+
+ if(*id) {
+ st = rrdset_find(host, id);
+ if (unlikely(!st)) {
+ error("requested a " PLUGINSD_KEYWORD_REPLAY_BEGIN " on chart '%s', which does not exist on host '%s'. Disabling it.",
+ id, rrdhost_hostname(host));
+ goto disable;
+ }
+
+ ((PARSER_USER_OBJECT *) user)->st = st;
+ ((PARSER_USER_OBJECT *) user)->replay.start_time = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.end_time = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0;
+ }
+
+ if(start_time_str && end_time_str) {
+ time_t start_time = strtol(start_time_str, NULL, 0);
+ time_t end_time = strtol(end_time_str, NULL, 0);
+
+ if(start_time && end_time) {
+ if (start_time > end_time) {
+ error("REPLAY: requested a " PLUGINSD_KEYWORD_REPLAY_BEGIN " on chart '%s' ('%s') on host '%s', but timings are invalid (%ld to %ld). Disabling it.",
+ rrdset_name(st), rrdset_id(st), rrdhost_hostname(st->rrdhost), start_time, end_time);
+ goto disable;
+ }
+
+ if (end_time - start_time != st->update_every)
+ rrdset_set_update_every(st, end_time - start_time);
+
+ st->last_collected_time.tv_sec = end_time;
+ st->last_collected_time.tv_usec = 0;
+
+ st->last_updated.tv_sec = end_time;
+ st->last_updated.tv_usec = 0;
+
+ ((PARSER_USER_OBJECT *) user)->replay.start_time = start_time;
+ ((PARSER_USER_OBJECT *) user)->replay.end_time = end_time;
+ ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = (usec_t) start_time * USEC_PER_SEC;
+ ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = (usec_t) end_time * USEC_PER_SEC;
+
+ st->counter++;
+ st->counter_done++;
+
+ // these are only needed for db mode RAM, SAVE, MAP, ALLOC
+ st->current_entry++;
+ if(st->current_entry >= st->entries)
+ st->current_entry -= st->entries;
+ }
+ }
+
+ return PARSER_RC_OK;
+
+disable:
+ ((PARSER_USER_OBJECT *)user)->enabled = 0;
+ return PARSER_RC_ERROR;
+}
+
+PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user, PLUGINSD_ACTION *plugins_action __maybe_unused)
+{
+ char *dimension = get_word(words, num_words, 1);
+ char *value_str = get_word(words, num_words, 2);
+ char *flags_str = get_word(words, num_words, 3);
+
+ RRDSET *st = ((PARSER_USER_OBJECT *