diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2022-11-20 23:47:53 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-11-20 23:47:53 +0200 |
commit | 284f6f3aa4f36cefad2601c490510621496c2b53 (patch) | |
tree | 97a7d55627ef7477f431c53a20d0e6f1f738a419 /parser | |
parent | 2d02484954f68bf7e3015cb649e2f10a9f3c5c95 (diff) |
streaming compression, query planner and replication fixes (#14023)
* streaming compression, query planner and replication fixes
* remove journal v2 stats from global statistics
* disable sql for checking past sql UUIDs
* single threaded replication
* final replication thread using dictionaries and JudyL for sorting the pending requests
* do not timeout the sending socket when there are pending replication requests
* streaming receiver using read() instead of fread()
* remove FILE * from streaming - now using posix read() and write()
* increase timeouts to 10 minutes
* apply sender timeout only when there are metrics that are supposed to be streamed
* error handling in replication
* remove retries on socket read timeout; better error messages
* take into account inbound traffic too to detect that a connection is stale
* remove race conditions from replication thread
* make sure deleted entries are marked as executed, so that even if deletion fails, they will not be executed
* 2 minutes timeout to retry streaming to a parent that already has this node
* remove unecessary condition check
* fix compilation warnings
* include judy in replication
* wrappers to handle retries for SSL_read and SSL_write
* compressed bytes read monitoring
* recursive locks on replication to make it faster during flush or cleanup
* replication completion chart at the receiver side
* simplified recursive mutex
* simplified recursive mutex again
Diffstat (limited to 'parser')
-rw-r--r-- | parser/parser.c | 59 | ||||
-rw-r--r-- | parser/parser.h | 13 |
2 files changed, 48 insertions, 24 deletions
diff --git a/parser/parser.c b/parser/parser.c index 3eeba8816a..5b4c528de1 100644 --- a/parser/parser.c +++ b/parser/parser.c @@ -29,14 +29,15 @@ inline int find_first_keyword(const char *str, char *keyword, int max_size, int * */ -PARSER *parser_init(RRDHOST *host, void *user, void *input, void *output, PARSER_INPUT_TYPE flags, void *ssl __maybe_unused) +PARSER *parser_init(RRDHOST *host, void *user, FILE *fp_input, FILE *fp_output, int fd, PARSER_INPUT_TYPE flags, void *ssl __maybe_unused) { PARSER *parser; parser = callocz(1, sizeof(*parser)); parser->user = user; - parser->input = input; - parser->output = output; + parser->fd = fd; + parser->fp_input = fp_input; + parser->fp_output = fp_output; #ifdef ENABLE_HTTPS parser->ssl_output = ssl; #endif @@ -222,19 +223,21 @@ int parser_next(PARSER *parser) } if (unlikely(parser->read_function)) - tmp = parser->read_function(parser->buffer, PLUGINSD_LINE_MAX, parser->input); + tmp = parser->read_function(parser->buffer, PLUGINSD_LINE_MAX, parser->fp_input); + else if(likely(parser->fp_input)) + tmp = fgets(parser->buffer, PLUGINSD_LINE_MAX, (FILE *)parser->fp_input); else - tmp = fgets(parser->buffer, PLUGINSD_LINE_MAX, (FILE *)parser->input); + tmp = NULL; if (unlikely(!tmp)) { if (unlikely(parser->eof_function)) { - int rc = parser->eof_function(parser->input); + int rc = parser->eof_function(parser->fp_input); error("read failed: user defined function returned %d", rc); } else { - if (feof((FILE *)parser->input)) + if (feof((FILE *)parser->fp_input)) error("read failed: end of file"); - else if (ferror((FILE *)parser->input)) + else if (ferror((FILE *)parser->fp_input)) error("read failed: input error"); else error("read failed: unknown error"); @@ -253,6 +256,8 @@ int parser_next(PARSER *parser) inline int parser_action(PARSER *parser, char *input) { + parser->line++; + PARSER_RC rc = PARSER_RC_OK; char *words[PLUGINSD_MAX_WORDS]; char command[PLUGINSD_LINE_MAX + 1]; @@ -288,7 +293,7 @@ inline int parser_action(PARSER *parser, char *input) if(buffer_strlen(parser->defer.response) > 10 * 1024 * 1024) { // more than 10MB of data // a bad plugin that did not send the end_keyword - internal_error(true, "Deferred response is too big (%zu bytes). Stopping this plugin.", buffer_strlen(parser->defer.response)); + internal_error(true, "PLUGINSD: deferred response is too big (%zu bytes). Stopping this plugin.", buffer_strlen(parser->defer.response)); return 1; } } @@ -321,11 +326,10 @@ inline int parser_action(PARSER *parser, char *input) size_t worker_job_id = WORKER_UTILIZATION_MAX_JOB_TYPES + 1; // set an invalid value by default while(tmp_keyword) { - if (command_hash == tmp_keyword->keyword_hash && - (!strcmp(command, tmp_keyword->keyword))) { - action_function_list = &tmp_keyword->func[0]; - worker_job_id = tmp_keyword->worker_job_id; - break; + if (command_hash == tmp_keyword->keyword_hash && (!strcmp(command, tmp_keyword->keyword))) { + action_function_list = &tmp_keyword->func[0]; + worker_job_id = tmp_keyword->worker_job_id; + break; } tmp_keyword = tmp_keyword->next; } @@ -335,17 +339,14 @@ inline int parser_action(PARSER *parser, char *input) rc = parser->unknown_function(words, num_words, parser->user); else rc = PARSER_RC_ERROR; - - internal_error(rc != PARSER_RC_OK, "Unknown keyword [%s]", input); } else { worker_is_busy(worker_job_id); while ((action_function = *action_function_list) != NULL) { rc = action_function(words, num_words, parser->user); - if (unlikely(rc == PARSER_RC_ERROR || rc == PARSER_RC_STOP)) { - internal_error(true, "action_function() failed with rc = %u", rc); + if (unlikely(rc == PARSER_RC_ERROR || rc == PARSER_RC_STOP)) break; - } + action_function_list++; } worker_is_idle(); @@ -354,7 +355,25 @@ inline int parser_action(PARSER *parser, char *input) if (likely(input == parser->buffer)) parser->flags |= PARSER_INPUT_PROCESSED; - internal_error(rc == PARSER_RC_ERROR, "parser_action() failed."); +#ifdef NETDATA_INTERNAL_CHECKS + if(rc == PARSER_RC_ERROR) { + BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX); + for(size_t i = 0; i < num_words ;i++) { + if(i) buffer_fast_strcat(wb, " ", 1); + + buffer_fast_strcat(wb, "\"", 1); + const char *s = get_word(words, num_words, i); + buffer_strcat(wb, s?s:""); + buffer_fast_strcat(wb, "\"", 1); + } + + internal_error(true, "PLUGINSD: parser_action('%s') failed on line %zu: { %s } (quotes added to show parsing)", + command, parser->line, buffer_tostring(wb)); + + buffer_free(wb); + } +#endif + return (rc == PARSER_RC_ERROR); } diff --git a/parser/parser.h b/parser/parser.h index c8a89a59d0..ad74883892 100644 --- a/parser/parser.h +++ b/parser/parser.h @@ -7,7 +7,10 @@ #define PARSER_MAX_CALLBACKS 20 #define PARSER_MAX_RECOVER_KEYWORDS 128 -#define WORKER_PARSER_FIRST_JOB 1 +#define WORKER_PARSER_FIRST_JOB 3 + +// this has to be in-sync with the same at receiver.c +#define WORKER_RECEIVER_JOB_REPLICATION_COMPLETION (WORKER_PARSER_FIRST_JOB - 3) // PARSER return codes typedef enum parser_rc { @@ -47,8 +50,9 @@ typedef struct parser { size_t worker_job_next_id; uint8_t version; // Parser version RRDHOST *host; - void *input; // Input source e.g. stream - void *output; // Stream to send commands to plugin + int fd; // Socket + FILE *fp_input; // Input source e.g. stream + FILE *fp_output; // Stream to send commands to plugin #ifdef ENABLE_HTTPS struct netdata_ssl *ssl_output; #endif @@ -56,6 +60,7 @@ typedef struct parser { PARSER_KEYWORD *keyword; // List of parse keywords and functions void *user; // User defined structure to hold extra state between calls uint32_t flags; + size_t line; char *(*read_function)(char *buffer, long unsigned int, void *input); int (*eof_function)(void *input); @@ -85,7 +90,7 @@ typedef struct parser { int find_first_keyword(const char *str, char *keyword, int max_size, int (*custom_isspace)(char)); -PARSER *parser_init(RRDHOST *host, void *user, void *input, void *output, PARSER_INPUT_TYPE flags, void *ssl); +PARSER *parser_init(RRDHOST *host, void *user, FILE *fp_input, FILE *fp_output, int fd, PARSER_INPUT_TYPE flags, void *ssl); int parser_add_keyword(PARSER *working_parser, char *keyword, keyword_function func); int parser_next(PARSER *working_parser); int parser_action(PARSER *working_parser, char *input); |