diff options
author | vkalintiris <vasilis@netdata.cloud> | 2022-10-31 19:53:20 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-10-31 19:53:20 +0200 |
commit | 282e0dfaa97289cc6542742e9e389bd76b7e4164 (patch) | |
tree | b23e108b35adc8ed322e8167d0f1fe607c2cfa4c /collectors/apps.plugin | |
parent | df87a538cfaba5014a752937714756b7c5d30c93 (diff) |
Replication of metrics (gaps filling) during streaming (#13873)
* Revert "Use llvm's ar and ranlib when compiling with clang (#13854)"
This reverts commit a9135f47bbb36e9cb437b18a7109607569580db7.
* Profile plugin
* Fix macos static thread
* Add support for replication
- Add a new capability for replication, when not supported the agent
should behave as previously.
- When replication is supported, the text protocol supports the
following new commands:
- CHART_DEFINITION_END: send the first/last entry of the child
- REPLAY_RRDSET_BEGIN: sends the name of the chart we are
replicating
- REPLAY_RRDSET_HEADER: sends a line describing the columns of the
following command (ie. start-time, end-time, dim1-name, ...)
- REPLAY_RRDSET_DONE: sends values to push for a specific start/end
time
- REPLAY_RRDSET_END: send the (a) update every of the chart, (b)
first/last entries in DB, (c) whether the child's been told to
start streaming, (d) original after/before period to replicate.
- REPLAY_CHART: Sent from a parent to a child, specifying (a)
the chart name we want data for, (b) whether the child should
start streaming once it has fullfilled the request with the
aforementioned commands, (c) after/before of the data the parent
wants
- As a consequence of the new protocol, streaming is disabled for all
charts on a new connection. It's enabled once replication is finished.
- The configuration parameters are specified from within stream.conf:
- "enable replication = yes|no"
- "seconds to replicate = 3600"
- "replication step = 600" (ie. how many seconds to fill per
roundtrip request.
* Minor fixes
- quote set and dim ids
- start streaming after writing replicated data to the buffer
- write replicated data only when buffer is less than 50% full.
- use reentrant iteration for charts
* Do not send chart definitions on connection.
* Track replication status through rrdset flags.
* Add debug flag for noisy log messages.
* Add license notice.
* Iterate charts with reentrant loop
* Set replication finished flag when streaming is disabled.
* Revert "Profile plugin"
This reverts commit 468fc9386e5283e0865fae56e9989b8ec83de14d.
Used only for testing purposes.
* Revert "Revert "Use llvm's ar and ranlib when compiling with clang (#13854)""
This reverts commit 27c955c58d95aed6c44d42e8b675f0cf3ca45c6d.
Reapply commit that I had to revert in order to be able to build the
agent on MacOS.
* Build replication source files with CMake.
* Pass number of words in plugind functions.
* Use get_word instead of indexing words.
* Use size_t instead of int.
* Pay only what we use when splitting words.
* no need to redefine PLUGINSD_MAX_WORDS
* fix formatting warning
* all usages of pluginsd_split_words() should use the return value to ensure non-cached results reuse; no need to lock the host to find a chart
* keep a sender dictionary with all the replication commands received and remove replication commands from charts
* do not replicate future data
* use last_updated to find the end of the db
* uniformity of replication logs
* rewrite of the query logic
* replication.c in C; debug info in human readable dates
* update the chart on every replication row
* update all chart members so that rrdset_done() can continue
* update the protocol to push one dimension per line and transfer data collection state to parent
* fix formatting
* remove replication object from pluginsd
* shorter communication
* fix typo
* support for replication proxies
* proper use of flags
* set receiver replication finished flag on charts created after the sender has been connected
* clear RRDSET_FLAG_SYNC_CLOCK on replicated charts
* log storing of nulls
* log first store
* log update every switches
* test ignoring timestamps but sending a point just after replication end
* replication should work on end_time
* use replicated timestamps
* at the final replication step, replicate all the remaining points
* cleanup code from tests
* print timestamps as unsigned long long
* more formating changes; fix conflicting type of replicate_chart_response()
* updated stream.conf
* always respond to replication requests
* in non-dbengine db modes, do not replicate more than the database size
* advance the db pointer of legacy db modes
* should be multiplied by update_every
* fix buggy label parsing - identified by codacy
* dont log error on history mismatches for db mode dbengine
* allow SSL requests to streaming children
* dont use ssl variable
Co-authored-by: Costa Tsaousis <costa@netdata.cloud>
Diffstat (limited to 'collectors/apps.plugin')
-rw-r--r-- | collectors/apps.plugin/apps_plugin.c | 53 |
1 files changed, 28 insertions, 25 deletions
diff --git a/collectors/apps.plugin/apps_plugin.c b/collectors/apps.plugin/apps_plugin.c index 858bef2c50..084afeb3d6 100644 --- a/collectors/apps.plugin/apps_plugin.c +++ b/collectors/apps.plugin/apps_plugin.c @@ -4311,7 +4311,7 @@ static void apps_plugin_function_processes(const char *transaction, char *functi struct pid_stat *p; char *words[PLUGINSD_MAX_WORDS] = { NULL }; - pluginsd_split_words(function, words, PLUGINSD_MAX_WORDS, NULL, NULL, 0); + size_t num_words = pluginsd_split_words(function, words, PLUGINSD_MAX_WORDS, NULL, NULL, 0); struct target *category = NULL, *user = NULL, *group = NULL; const char *process_name = NULL; @@ -4322,51 +4322,52 @@ static void apps_plugin_function_processes(const char *transaction, char *functi bool filter_pid = false, filter_uid = false, filter_gid = false; for(int i = 1; i < PLUGINSD_MAX_WORDS ;i++) { - if(!words[i]) break; + const char *keyword = get_word(words, num_words, i); + if(!keyword) break; - if(!category && strncmp(words[i], PROCESS_FILTER_CATEGORY, strlen(PROCESS_FILTER_CATEGORY)) == 0) { - category = find_target_by_name(apps_groups_root_target, &words[i][strlen(PROCESS_FILTER_CATEGORY)]); + if(!category && strncmp(keyword, PROCESS_FILTER_CATEGORY, strlen(PROCESS_FILTER_CATEGORY)) == 0) { + category = find_target_by_name(apps_groups_root_target, &keyword[strlen(PROCESS_FILTER_CATEGORY)]); if(!category) { apps_plugin_function_error(transaction, HTTP_RESP_BAD_REQUEST, "No category with that name found."); return; } } - else if(!user && strncmp(words[i], PROCESS_FILTER_USER, strlen(PROCESS_FILTER_USER)) == 0) { - user = find_target_by_name(users_root_target, &words[i][strlen(PROCESS_FILTER_USER)]); + else if(!user && strncmp(keyword, PROCESS_FILTER_USER, strlen(PROCESS_FILTER_USER)) == 0) { + user = find_target_by_name(users_root_target, &keyword[strlen(PROCESS_FILTER_USER)]); if(!user) { apps_plugin_function_error(transaction, HTTP_RESP_BAD_REQUEST, "No user with that name found."); return; } } - else if(strncmp(words[i], PROCESS_FILTER_GROUP, strlen(PROCESS_FILTER_GROUP)) == 0) { - group = find_target_by_name(groups_root_target, &words[i][strlen(PROCESS_FILTER_GROUP)]); + else if(strncmp(keyword, PROCESS_FILTER_GROUP, strlen(PROCESS_FILTER_GROUP)) == 0) { + group = find_target_by_name(groups_root_target, &keyword[strlen(PROCESS_FILTER_GROUP)]); if(!group) { apps_plugin_function_error(transaction, HTTP_RESP_BAD_REQUEST, "No group with that name found."); return; } } - else if(!process_name && strncmp(words[i], PROCESS_FILTER_PROCESS, strlen(PROCESS_FILTER_PROCESS)) == 0) { - process_name = &words[i][strlen(PROCESS_FILTER_PROCESS)]; + else if(!process_name && strncmp(keyword, PROCESS_FILTER_PROCESS, strlen(PROCESS_FILTER_PROCESS)) == 0) { + process_name = &keyword[strlen(PROCESS_FILTER_PROCESS)]; } - else if(!pid && strncmp(words[i], PROCESS_FILTER_PID, strlen(PROCESS_FILTER_PID)) == 0) { - pid = str2i(&words[i][strlen(PROCESS_FILTER_PID)]); + else if(!pid && strncmp(keyword, PROCESS_FILTER_PID, strlen(PROCESS_FILTER_PID)) == 0) { + pid = str2i(&keyword[strlen(PROCESS_FILTER_PID)]); filter_pid = true; } - else if(!uid && strncmp(words[i], PROCESS_FILTER_UID, strlen(PROCESS_FILTER_UID)) == 0) { - uid = str2i(&words[i][strlen(PROCESS_FILTER_UID)]); + else if(!uid && strncmp(keyword, PROCESS_FILTER_UID, strlen(PROCESS_FILTER_UID)) == 0) { + uid = str2i(&keyword[strlen(PROCESS_FILTER_UID)]); filter_uid = true; } - else if(!gid && strncmp(words[i], PROCESS_FILTER_GID, strlen(PROCESS_FILTER_GID)) == 0) { - gid = str2i(&words[i][strlen(PROCESS_FILTER_GID)]); + else if(!gid && strncmp(keyword, PROCESS_FILTER_GID, strlen(PROCESS_FILTER_GID)) == 0) { + gid = str2i(&keyword[strlen(PROCESS_FILTER_GID)]); filter_gid = true; } - else if(strcmp(words[i], "help") == 0) { + else if(strcmp(keyword, "help") == 0) { apps_plugin_function_processes_help(transaction); return; } else { char msg[PLUGINSD_LINE_MAX]; - snprintfz(msg, PLUGINSD_LINE_MAX, "Invalid parameter '%s'", words[i]); + snprintfz(msg, PLUGINSD_LINE_MAX, "Invalid parameter '%s'", keyword); apps_plugin_function_error(transaction, HTTP_RESP_BAD_REQUEST, msg); return; } @@ -4779,16 +4780,18 @@ void *reader_main(void *arg __maybe_unused) { while(!apps_plugin_exit && (s = fgets(buffer, PLUGINSD_LINE_MAX, stdin))) { char *words[PLUGINSD_MAX_WORDS] = { NULL }; - pluginsd_split_words(buffer, words, PLUGINSD_MAX_WORDS, NULL, NULL, 0); + size_t num_words = pluginsd_split_words(buffer, words, PLUGINSD_MAX_WORDS, NULL, NULL, 0); - if(words[0] && strcmp(words[0], PLUGINSD_KEYWORD_FUNCTION) == 0) { - char *transaction = words[1]; - char *timeout_s = words[2]; - char *function = words[3]; + const char *keyword = get_word(words, num_words, 0); + + if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0) { + char *transaction = get_word(words, num_words, 1); + char *timeout_s = get_word(words, num_words, 2); + char *function = get_word(words, num_words, 3); if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) { error("Received incomplete %s (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.", - words[0], + keyword, transaction?transaction:"(unset)", timeout_s?timeout_s:"(unset)", function?function:"(unset)"); @@ -4813,7 +4816,7 @@ void *reader_main(void *arg __maybe_unused) { } } else - error("Received unknown command: %s", words[0]?words[0]:"(unset)"); + error("Received unknown command: %s", keyword?keyword:"(unset)"); } if(!s || feof(stdin) || ferror(stdin)) { |