From 060f3ff7bf5b91899280ceaefaddf7ff4016a1f6 Mon Sep 17 00:00:00 2001 From: Costa Tsaousis Date: Thu, 21 Sep 2023 19:05:30 +0300 Subject: remove the line length limit from pluginsd (#16013) * remove the line length limit from pluginsd * initialize the buffer on every iteration * buffer_tostring inlined * Release buffer --------- Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com> --- collectors/plugins.d/pluginsd_parser.c | 13 ++++-- collectors/plugins.d/pluginsd_parser.h | 6 +-- .../systemd-journal.plugin/systemd-journal.c | 2 +- libnetdata/buffer/buffer.c | 10 ----- libnetdata/buffer/buffer.h | 11 ++++- streaming/receiver.c | 48 ++++++++++------------ streaming/rrdpush.h | 2 +- 7 files changed, 47 insertions(+), 45 deletions(-) diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c index 82c36219a0..fbcd812627 100644 --- a/collectors/plugins.d/pluginsd_parser.c +++ b/collectors/plugins.d/pluginsd_parser.c @@ -2352,15 +2352,22 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser); buffered_reader_init(&parser->reader); - char buffer[PLUGINSD_LINE_MAX + 2]; + BUFFER *buffer = buffer_create(sizeof(parser->reader.read_buffer) + 2, NULL); while(likely(service_running(SERVICE_COLLECTORS))) { - if (unlikely(!buffered_reader_next_line(&parser->reader, buffer, PLUGINSD_LINE_MAX + 2))) { + if (unlikely(!buffered_reader_next_line(&parser->reader, buffer))) { if(unlikely(!buffered_reader_read_timeout(&parser->reader, fileno((FILE *)parser->fp_input), 2 * 60 * MSEC_PER_SEC))) break; + + continue; } - else if(unlikely(parser_action(parser, buffer))) + + if(unlikely(parser_action(parser, buffer->buffer))) break; + + buffer->len = 0; + buffer->buffer[0] = '\0'; } + buffer_free(buffer); cd->unsafe.enabled = parser->user.enabled; count = parser->user.data_collections_count; diff --git a/collectors/plugins.d/pluginsd_parser.h b/collectors/plugins.d/pluginsd_parser.h index 9fdb5846dd..74767569b9 100644 --- a/collectors/plugins.d/pluginsd_parser.h +++ b/collectors/plugins.d/pluginsd_parser.h @@ -154,8 +154,8 @@ static inline int parser_action(PARSER *parser, char *input) { parser->line++; if(unlikely(parser->flags & PARSER_DEFER_UNTIL_KEYWORD)) { - char command[PLUGINSD_LINE_MAX + 1]; - bool has_keyword = find_first_keyword(input, command, PLUGINSD_LINE_MAX, isspace_map_pluginsd); + char command[100 + 1]; + bool has_keyword = find_first_keyword(input, command, 100, isspace_map_pluginsd); if(!has_keyword || strcmp(command, parser->defer.end_keyword) != 0) { if(parser->defer.response) { @@ -183,7 +183,7 @@ static inline int parser_action(PARSER *parser, char *input) { return 0; } - char *words[PLUGINSD_MAX_WORDS]; + static __thread char *words[PLUGINSD_MAX_WORDS]; size_t num_words = quoted_strings_splitter_pluginsd(input, words, PLUGINSD_MAX_WORDS); const char *command = get_word(words, num_words, 0); diff --git a/collectors/systemd-journal.plugin/systemd-journal.c b/collectors/systemd-journal.plugin/systemd-journal.c index 522c127bde..bfa51393fb 100644 --- a/collectors/systemd-journal.plugin/systemd-journal.c +++ b/collectors/systemd-journal.plugin/systemd-journal.c @@ -690,7 +690,7 @@ static void netdata_systemd_journal_rich_message(FACETS *facets __maybe_unused, static void function_systemd_journal(const char *transaction, char *function, int timeout, bool *cancelled) { BUFFER *wb = buffer_create(0, NULL); buffer_flush(wb); - buffer_json_initialize(wb, "\"", "\"", 0, true, BUFFER_JSON_OPTIONS_NEWLINE_ON_ARRAY_ITEMS); + buffer_json_initialize(wb, "\"", "\"", 0, true, BUFFER_JSON_OPTIONS_MINIFY); FACETS *facets = facets_create(50, FACETS_OPTION_ALL_KEYS_FTS, SYSTEMD_ALWAYS_VISIBLE_KEYS, diff --git a/libnetdata/buffer/buffer.c b/libnetdata/buffer/buffer.c index 2d09bb1ff6..feee72fcf5 100644 --- a/libnetdata/buffer/buffer.c +++ b/libnetdata/buffer/buffer.c @@ -20,16 +20,6 @@ void buffer_reset(BUFFER *wb) { buffer_overflow_check(wb); } -const char *buffer_tostring(BUFFER *wb) -{ - buffer_need_bytes(wb, 1); - wb->buffer[wb->len] = '\0'; - - buffer_overflow_check(wb); - - return(wb->buffer); -} - void buffer_char_replace(BUFFER *wb, char from, char to) { char *s = wb->buffer, *end = &wb->buffer[wb->len]; diff --git a/libnetdata/buffer/buffer.h b/libnetdata/buffer/buffer.h index 0e3de92473..26efe00709 100644 --- a/libnetdata/buffer/buffer.h +++ b/libnetdata/buffer/buffer.h @@ -97,7 +97,6 @@ typedef struct web_buffer { #define buffer_no_cacheable(wb) do { (wb)->options |= WB_CONTENT_NO_CACHEABLE; if((wb)->options & WB_CONTENT_CACHEABLE) (wb)->options &= ~WB_CONTENT_CACHEABLE; (wb)->expires = 0; } while(0) #define buffer_strlen(wb) ((wb)->len) -const char *buffer_tostring(BUFFER *wb); #define BUFFER_OVERFLOW_EOF "EOF" @@ -158,6 +157,16 @@ void buffer_json_initialize(BUFFER *wb, const char *key_quote, const char *value void buffer_json_finalize(BUFFER *wb); +static const char *buffer_tostring(BUFFER *wb) +{ + buffer_need_bytes(wb, 1); + wb->buffer[wb->len] = '\0'; + + buffer_overflow_check(wb); + + return(wb->buffer); +} + static inline void _buffer_json_depth_push(BUFFER *wb, BUFFER_JSON_NODE_TYPE type) { #ifdef NETDATA_INTERNAL_CHECKS assert(wb->json.depth <= BUFFER_JSON_MAX_DEPTH && "BUFFER JSON: max nesting reached"); diff --git a/streaming/receiver.c b/streaming/receiver.c index 3ff022e973..10ef8b7d3f 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -226,53 +226,47 @@ static inline bool receiver_read_compressed(struct receiver_state *r) { /* Produce a full line if one exists, statefully return where we start next time. * When we hit the end of the buffer with a partial line move it to the beginning for the next fill. */ -inline char *buffered_reader_next_line(struct buffered_reader *reader, char *dst, size_t dst_size) { +inline bool buffered_reader_next_line(struct buffered_reader *reader, BUFFER *dst) { + buffer_need_bytes(dst, reader->read_len - reader->pos + 2); + size_t start = reader->pos; char *ss = &reader->read_buffer[start]; char *se = &reader->read_buffer[reader->read_len]; - char *ds = dst; - char *de = &dst[dst_size - 2]; + char *ds = &dst->buffer[dst->len]; + char *de = &ds[dst->size - dst->len - 2]; if(ss >= se) { *ds = '\0'; reader->pos = 0; reader->read_len = 0; reader->read_buffer[reader->read_len] = '\0'; - return NULL; + return false; } // copy all bytes to buffer - while(ss < se && ds < de && *ss != '\n') + while(ss < se && ds < de && *ss != '\n') { *ds++ = *ss++; + dst->len++; + } // if we have a newline, return the buffer if(ss < se && ds < de && *ss == '\n') { // newline found in the r->read_buffer *ds++ = *ss++; // copy the newline too - *ds = '\0'; + dst->len++; - reader->pos = ss - reader->read_buffer; - return dst; - } - - // if the destination is full, oops! - if(ds == de) { - netdata_log_error("STREAM: received line exceeds %d bytes. Truncating it.", PLUGINSD_LINE_MAX); *ds = '\0'; + reader->pos = ss - reader->read_buffer; - return dst; + return true; } - // no newline found in the r->read_buffer - // move everything to the beginning - memmove(reader->read_buffer, &reader->read_buffer[start], reader->read_len - start); - reader->read_len -= (int)start; - reader->read_buffer[reader->read_len] = '\0'; - *ds = '\0'; reader->pos = 0; - return NULL; + reader->read_len = 0; + reader->read_buffer[reader->read_len] = '\0'; + return false; } bool plugin_is_enabled(struct plugind *cd); @@ -342,10 +336,10 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i buffered_reader_init(&rpt->reader); - char buffer[PLUGINSD_LINE_MAX + 2] = ""; + BUFFER *buffer = buffer_create(sizeof(rpt->reader.read_buffer), NULL); while(!receiver_should_stop(rpt)) { - if(!buffered_reader_next_line(&rpt->reader, buffer, PLUGINSD_LINE_MAX + 2)) { + if(!buffered_reader_next_line(&rpt->reader, buffer)) { bool have_new_data = compressed_connection ? receiver_read_compressed(rpt) : receiver_read_uncompressed(rpt); if(unlikely(!have_new_data)) { @@ -356,13 +350,15 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i continue; } - if (unlikely(parser_action(parser, buffer))) { - internal_error(true, "parser_action() failed on keyword '%s'.", buffer); + if (unlikely(parser_action(parser, buffer->buffer))) { receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED, false); break; } - } + buffer->len = 0; + buffer->buffer[0] = '\0'; + } + buffer_free(buffer); result = parser->user.data_collections_count; // free parser with the pop function diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h index 09df8e711f..f8d6926039 100644 --- a/streaming/rrdpush.h +++ b/streaming/rrdpush.h @@ -357,7 +357,7 @@ struct buffered_reader { char read_buffer[PLUGINSD_LINE_MAX + 1]; }; -char *buffered_reader_next_line(struct buffered_reader *reader, char *dst, size_t dst_size); +bool buffered_reader_next_line(struct buffered_reader *reader, BUFFER *dst); static inline void buffered_reader_init(struct buffered_reader *reader) { reader->read_buffer[0] = '\0'; reader->read_len = 0; -- cgit v1.2.3