summaryrefslogtreecommitdiffstats
path: root/parser
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-11-20 23:47:53 +0200
committerGitHub <noreply@github.com>2022-11-20 23:47:53 +0200
commit284f6f3aa4f36cefad2601c490510621496c2b53 (patch)
tree97a7d55627ef7477f431c53a20d0e6f1f738a419 /parser
parent2d02484954f68bf7e3015cb649e2f10a9f3c5c95 (diff)
streaming compression, query planner and replication fixes (#14023)
* streaming compression, query planner and replication fixes * remove journal v2 stats from global statistics * disable sql for checking past sql UUIDs * single threaded replication * final replication thread using dictionaries and JudyL for sorting the pending requests * do not timeout the sending socket when there are pending replication requests * streaming receiver using read() instead of fread() * remove FILE * from streaming - now using posix read() and write() * increase timeouts to 10 minutes * apply sender timeout only when there are metrics that are supposed to be streamed * error handling in replication * remove retries on socket read timeout; better error messages * take into account inbound traffic too to detect that a connection is stale * remove race conditions from replication thread * make sure deleted entries are marked as executed, so that even if deletion fails, they will not be executed * 2 minutes timeout to retry streaming to a parent that already has this node * remove unecessary condition check * fix compilation warnings * include judy in replication * wrappers to handle retries for SSL_read and SSL_write * compressed bytes read monitoring * recursive locks on replication to make it faster during flush or cleanup * replication completion chart at the receiver side * simplified recursive mutex * simplified recursive mutex again
Diffstat (limited to 'parser')
-rw-r--r--parser/parser.c59
-rw-r--r--parser/parser.h13
2 files changed, 48 insertions, 24 deletions
diff --git a/parser/parser.c b/parser/parser.c
index 3eeba8816a..5b4c528de1 100644
--- a/parser/parser.c
+++ b/parser/parser.c
@@ -29,14 +29,15 @@ inline int find_first_keyword(const char *str, char *keyword, int max_size, int
*
*/
-PARSER *parser_init(RRDHOST *host, void *user, void *input, void *output, PARSER_INPUT_TYPE flags, void *ssl __maybe_unused)
+PARSER *parser_init(RRDHOST *host, void *user, FILE *fp_input, FILE *fp_output, int fd, PARSER_INPUT_TYPE flags, void *ssl __maybe_unused)
{
PARSER *parser;
parser = callocz(1, sizeof(*parser));
parser->user = user;
- parser->input = input;
- parser->output = output;
+ parser->fd = fd;
+ parser->fp_input = fp_input;
+ parser->fp_output = fp_output;
#ifdef ENABLE_HTTPS
parser->ssl_output = ssl;
#endif
@@ -222,19 +223,21 @@ int parser_next(PARSER *parser)
}
if (unlikely(parser->read_function))
- tmp = parser->read_function(parser->buffer, PLUGINSD_LINE_MAX, parser->input);
+ tmp = parser->read_function(parser->buffer, PLUGINSD_LINE_MAX, parser->fp_input);
+ else if(likely(parser->fp_input))
+ tmp = fgets(parser->buffer, PLUGINSD_LINE_MAX, (FILE *)parser->fp_input);
else
- tmp = fgets(parser->buffer, PLUGINSD_LINE_MAX, (FILE *)parser->input);
+ tmp = NULL;
if (unlikely(!tmp)) {
if (unlikely(parser->eof_function)) {
- int rc = parser->eof_function(parser->input);
+ int rc = parser->eof_function(parser->fp_input);
error("read failed: user defined function returned %d", rc);
}
else {
- if (feof((FILE *)parser->input))
+ if (feof((FILE *)parser->fp_input))
error("read failed: end of file");
- else if (ferror((FILE *)parser->input))
+ else if (ferror((FILE *)parser->fp_input))
error("read failed: input error");
else
error("read failed: unknown error");
@@ -253,6 +256,8 @@ int parser_next(PARSER *parser)
inline int parser_action(PARSER *parser, char *input)
{
+ parser->line++;
+
PARSER_RC rc = PARSER_RC_OK;
char *words[PLUGINSD_MAX_WORDS];
char command[PLUGINSD_LINE_MAX + 1];
@@ -288,7 +293,7 @@ inline int parser_action(PARSER *parser, char *input)
if(buffer_strlen(parser->defer.response) > 10 * 1024 * 1024) {
// more than 10MB of data
// a bad plugin that did not send the end_keyword
- internal_error(true, "Deferred response is too big (%zu bytes). Stopping this plugin.", buffer_strlen(parser->defer.response));
+ internal_error(true, "PLUGINSD: deferred response is too big (%zu bytes). Stopping this plugin.", buffer_strlen(parser->defer.response));
return 1;
}
}
@@ -321,11 +326,10 @@ inline int parser_action(PARSER *parser, char *input)
size_t worker_job_id = WORKER_UTILIZATION_MAX_JOB_TYPES + 1; // set an invalid value by default
while(tmp_keyword) {
- if (command_hash == tmp_keyword->keyword_hash &&
- (!strcmp(command, tmp_keyword->keyword))) {
- action_function_list = &tmp_keyword->func[0];
- worker_job_id = tmp_keyword->worker_job_id;
- break;
+ if (command_hash == tmp_keyword->keyword_hash && (!strcmp(command, tmp_keyword->keyword))) {
+ action_function_list = &tmp_keyword->func[0];
+ worker_job_id = tmp_keyword->worker_job_id;
+ break;
}
tmp_keyword = tmp_keyword->next;
}
@@ -335,17 +339,14 @@ inline int parser_action(PARSER *parser, char *input)
rc = parser->unknown_function(words, num_words, parser->user);
else
rc = PARSER_RC_ERROR;
-
- internal_error(rc != PARSER_RC_OK, "Unknown keyword [%s]", input);
}
else {
worker_is_busy(worker_job_id);
while ((action_function = *action_function_list) != NULL) {
rc = action_function(words, num_words, parser->user);
- if (unlikely(rc == PARSER_RC_ERROR || rc == PARSER_RC_STOP)) {
- internal_error(true, "action_function() failed with rc = %u", rc);
+ if (unlikely(rc == PARSER_RC_ERROR || rc == PARSER_RC_STOP))
break;
- }
+
action_function_list++;
}
worker_is_idle();
@@ -354,7 +355,25 @@ inline int parser_action(PARSER *parser, char *input)
if (likely(input == parser->buffer))
parser->flags |= PARSER_INPUT_PROCESSED;
- internal_error(rc == PARSER_RC_ERROR, "parser_action() failed.");
+#ifdef NETDATA_INTERNAL_CHECKS
+ if(rc == PARSER_RC_ERROR) {
+ BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX);
+ for(size_t i = 0; i < num_words ;i++) {
+ if(i) buffer_fast_strcat(wb, " ", 1);
+
+ buffer_fast_strcat(wb, "\"", 1);
+ const char *s = get_word(words, num_words, i);
+ buffer_strcat(wb, s?s:"");
+ buffer_fast_strcat(wb, "\"", 1);
+ }
+
+ internal_error(true, "PLUGINSD: parser_action('%s') failed on line %zu: { %s } (quotes added to show parsing)",
+ command, parser->line, buffer_tostring(wb));
+
+ buffer_free(wb);
+ }
+#endif
+
return (rc == PARSER_RC_ERROR);
}
diff --git a/parser/parser.h b/parser/parser.h
index c8a89a59d0..ad74883892 100644
--- a/parser/parser.h
+++ b/parser/parser.h
@@ -7,7 +7,10 @@
#define PARSER_MAX_CALLBACKS 20
#define PARSER_MAX_RECOVER_KEYWORDS 128
-#define WORKER_PARSER_FIRST_JOB 1
+#define WORKER_PARSER_FIRST_JOB 3
+
+// this has to be in-sync with the same at receiver.c
+#define WORKER_RECEIVER_JOB_REPLICATION_COMPLETION (WORKER_PARSER_FIRST_JOB - 3)
// PARSER return codes
typedef enum parser_rc {
@@ -47,8 +50,9 @@ typedef struct parser {
size_t worker_job_next_id;
uint8_t version; // Parser version
RRDHOST *host;
- void *input; // Input source e.g. stream
- void *output; // Stream to send commands to plugin
+ int fd; // Socket
+ FILE *fp_input; // Input source e.g. stream
+ FILE *fp_output; // Stream to send commands to plugin
#ifdef ENABLE_HTTPS
struct netdata_ssl *ssl_output;
#endif
@@ -56,6 +60,7 @@ typedef struct parser {
PARSER_KEYWORD *keyword; // List of parse keywords and functions
void *user; // User defined structure to hold extra state between calls
uint32_t flags;
+ size_t line;
char *(*read_function)(char *buffer, long unsigned int, void *input);
int (*eof_function)(void *input);
@@ -85,7 +90,7 @@ typedef struct parser {
int find_first_keyword(const char *str, char *keyword, int max_size, int (*custom_isspace)(char));
-PARSER *parser_init(RRDHOST *host, void *user, void *input, void *output, PARSER_INPUT_TYPE flags, void *ssl);
+PARSER *parser_init(RRDHOST *host, void *user, FILE *fp_input, FILE *fp_output, int fd, PARSER_INPUT_TYPE flags, void *ssl);
int parser_add_keyword(PARSER *working_parser, char *keyword, keyword_function func);
int parser_next(PARSER *working_parser);
int parser_action(PARSER *working_parser, char *input);