summaryrefslogtreecommitdiffstats
path: root/collectors/plugins.d
diff options
context:
space:
mode:
authorAndrew Moss <1043609+amoss@users.noreply.github.com>2020-06-03 08:38:25 +0200
committerGitHub <noreply@github.com>2020-06-03 08:38:25 +0200
commit49719a961d6c079004b65458ea8c5e08ada1c44c (patch)
tree258b25ac60c403696a72b1589d5fa8634dfc6764 /collectors/plugins.d
parent1aa2cd7c43f6dd68b4bb43a87eb8b2995687ca9c (diff)
Fix bugs in streaming and enable support for gap filling (#9214)
This PR adds (inactive) support that we will use to fill the gaps on chart when a receiving agent goes offline and the sender reconnects. The streaming component has been reworked to make the connection bi-directional and fix several outstanding bugs in the area. * Fixed an incorrect case of version negotiation. Removed fatal() on exhaustion of fds. * Fixed cases that fell through to polling the socket after closing. * Fixed locking of data related to sender and receiver in the host structure. * Added fine-grained locks to reduce contention. * Added circular buffer to sender to prevent starvation in high-latency conditions. * Fixed case where agent is a proxy and negotiated different streaming versions with sender and receiver. * Changed interface to new parser to put the buffering code in streaming. * Fixed the bug that stopped senders from reconnecting after their socket times out - this was part of the scaling fixes that provide an early shortcut path for rejecting connections without lock contention. * Uses fine-grained locking and a different approach to thread shutdown instead. * Added liveness detection to connections to allow selection of the best connection.
Diffstat (limited to 'collectors/plugins.d')
-rw-r--r--collectors/plugins.d/plugins_d.c98
-rw-r--r--collectors/plugins.d/plugins_d.h2
-rw-r--r--collectors/plugins.d/pluginsd_parser.c3
-rw-r--r--collectors/plugins.d/pluginsd_parser.h14
4 files changed, 16 insertions, 101 deletions
diff --git a/collectors/plugins.d/plugins_d.c b/collectors/plugins.d/plugins_d.c
index e39d9b0376..42889fa8ca 100644
--- a/collectors/plugins.d/plugins_d.c
+++ b/collectors/plugins.d/plugins_d.c
@@ -139,104 +139,6 @@ inline int pluginsd_split_words(char *str, char **words, int max_words, char *re
return quoted_strings_splitter(str, words, max_words, pluginsd_space, recover_input, recover_location, max_recover);
}
-#ifdef ENABLE_HTTPS
-/**
- * Update Buffer
- *
- * Update the temporary buffer used to parse data received from slave
- *
- * @param output is a pointer to the vector where I will store the data
- * @param ssl is the connection pointer with the server
- *
- * @return it returns the total of bytes read on success and a negative number otherwise
- */
-int pluginsd_update_buffer(char *output, SSL *ssl)
-{
- ERR_clear_error();
- int bytesleft = SSL_read(ssl, output, PLUGINSD_LINE_MAX_SSL_READ);
- if (bytesleft <= 0) {
- int sslerrno = SSL_get_error(ssl, bytesleft);
- switch (sslerrno) {
- case SSL_ERROR_WANT_READ:
- case SSL_ERROR_WANT_WRITE: {
- break;
- }
- default: {
- u_long err;
- char buf[256];
- int counter = 0;
- while ((err = ERR_get_error()) != 0) {
- ERR_error_string_n(err, buf, sizeof(buf));
- info(
- "%d SSL Handshake error (%s) on socket %d ", counter++,
- ERR_error_string((long)SSL_get_error(ssl, bytesleft), NULL), SSL_get_fd(ssl));
- }
- }
- }
- } else {
- output[bytesleft] = '\0';
- }
-
- return bytesleft;
-}
-
-/**
- * Get from Buffer
- *
- * Get data to process from buffer
- *
- * @param output is the output vector that will be used to parse the string.
- * @param bytesread the amount of bytes read in the previous iteration.
- * @param input the input vector where there are data to process
- * @param ssl a pointer to the connection with the server
- * @param src the first address of the input, because sometime will be necessary to restart the addr with it.
- *
- * @return It returns a pointer for the next iteration on success and NULL otherwise.
- */
-char *pluginsd_get_from_buffer(char *output, int *bytesread, char *input, SSL *ssl, char *src)
-{
- int copying = 1;
- char *endbuffer;
- size_t length;
- while (copying) {
- if (*bytesread > 0) {
- endbuffer = strchr(input, '\n');
- if (endbuffer) {
- copying = 0;
- endbuffer++; //Advance due the fact I wanna copy '\n'
- length = endbuffer - input;
- *bytesread -= length;
-
- memcpy(output, input, length);
- output += length;
- *output = '\0';
- input += length;
- } else {
- length = strlen(input);
- memcpy(output, input, length);
- output += length;
- input = src;
-
- *bytesread = pluginsd_update_buffer(input, ssl);
- if (*bytesread <= 0) {
- input = NULL;
- copying = 0;
- }
- }
- } else {
- //reduce sample of bytes read, print the length
- *bytesread = pluginsd_update_buffer(input, ssl);
- if (*bytesread <= 0) {
- input = NULL;
- copying = 0;
- }
- }
- }
-
- return input;
-}
-#endif
-
static void pluginsd_worker_thread_cleanup(void *arg)
{
diff --git a/collectors/plugins.d/plugins_d.h b/collectors/plugins.d/plugins_d.h
index e3589c6717..d8bb1b955f 100644
--- a/collectors/plugins.d/plugins_d.h
+++ b/collectors/plugins.d/plugins_d.h
@@ -80,7 +80,5 @@ extern int pluginsd_initialize_plugin_directories();
extern int config_isspace(char c);
extern int pluginsd_space(char c);
-extern int pluginsd_update_buffer(char *output, SSL *ssl);
-extern char * pluginsd_get_from_buffer(char *output, int *bytesread, char *input, SSL *ssl, char *src);
#endif /* NETDATA_PLUGINS_D_H */
diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c
index 19a73e859e..90558f7e58 100644
--- a/collectors/plugins.d/pluginsd_parser.c
+++ b/collectors/plugins.d/pluginsd_parser.c
@@ -239,7 +239,8 @@ PARSER_RC pluginsd_begin(char **words, void *user, PLUGINSD_ACTION *plugins_act
microseconds = str2ull(microseconds_txt);
if (plugins_action->begin_action) {
- return plugins_action->begin_action(user, st, microseconds, ((PARSER_USER_OBJECT *)user)->trust_durations);
+ return plugins_action->begin_action(user, st, microseconds,
+ ((PARSER_USER_OBJECT *)user)->trust_durations);
}
return PARSER_RC_OK;
disable:
diff --git a/collectors/plugins.d/pluginsd_parser.h b/collectors/plugins.d/pluginsd_parser.h
index 4551db080d..ea9ef40b58 100644
--- a/collectors/plugins.d/pluginsd_parser.h
+++ b/collectors/plugins.d/pluginsd_parser.h
@@ -10,6 +10,7 @@ typedef struct parser_user_object {
PARSER *parser;
RRDSET *st;
RRDHOST *host;
+ void *opaque;
struct plugind *cd;
int trust_durations;
struct label *new_labels;
@@ -17,4 +18,17 @@ typedef struct parser_user_object {
int enabled;
} PARSER_USER_OBJECT;
+PARSER_RC pluginsd_set_action(void *user, RRDSET *st, RRDDIM *rd, long long int value);
+PARSER_RC pluginsd_flush_action(void *user, RRDSET *st);
+PARSER_RC pluginsd_begin_action(void *user, RRDSET *st, usec_t microseconds, int trust_durations);
+PARSER_RC pluginsd_end_action(void *user, RRDSET *st);
+PARSER_RC pluginsd_chart_action(void *user, char *type, char *id, char *name, char *family, char *context, char *title, char *units, char *plugin,
+ char *module, int priority, int update_every, RRDSET_TYPE chart_type, char *options);
+PARSER_RC pluginsd_disable_action(void *user);
+PARSER_RC pluginsd_variable_action(void *user, RRDHOST *host, RRDSET *st, char *name, int global, calculated_number value);
+PARSER_RC pluginsd_dimension_action(void *user, RRDSET *st, char *id, char *name, char *algorithm, long multiplier, long divisor, char *options,
+ RRD_ALGORITHM algorithm_type);
+PARSER_RC pluginsd_label_action(void *user, char *key, char *value, LABEL_SOURCE source);
+PARSER_RC pluginsd_overwrite_action(void *user, RRDHOST *host, struct label *new_labels);
+
#endif //NETDATA_PLUGINSD_PARSER_H