summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-09-21 19:05:30 +0300
committerGitHub <noreply@github.com>2023-09-21 19:05:30 +0300
commit060f3ff7bf5b91899280ceaefaddf7ff4016a1f6 (patch)
treebc22bae023d952377076d39848703e829676d7b7
parent6403d115235e0ef58cfb6d977aee9ea3dc0143b1 (diff)
remove the line length limit from pluginsd (#16013)pre-integrations-docs
* remove the line length limit from pluginsd * initialize the buffer on every iteration * buffer_tostring inlined * Release buffer --------- Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
-rw-r--r--collectors/plugins.d/pluginsd_parser.c13
-rw-r--r--collectors/plugins.d/pluginsd_parser.h6
-rw-r--r--collectors/systemd-journal.plugin/systemd-journal.c2
-rw-r--r--libnetdata/buffer/buffer.c10
-rw-r--r--libnetdata/buffer/buffer.h11
-rw-r--r--streaming/receiver.c48
-rw-r--r--streaming/rrdpush.h2
7 files changed, 47 insertions, 45 deletions
diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c
index 82c36219a0..fbcd812627 100644
--- a/collectors/plugins.d/pluginsd_parser.c
+++ b/collectors/plugins.d/pluginsd_parser.c
@@ -2352,15 +2352,22 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi
netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser);
buffered_reader_init(&parser->reader);
- char buffer[PLUGINSD_LINE_MAX + 2];
+ BUFFER *buffer = buffer_create(sizeof(parser->reader.read_buffer) + 2, NULL);
while(likely(service_running(SERVICE_COLLECTORS))) {
- if (unlikely(!buffered_reader_next_line(&parser->reader, buffer, PLUGINSD_LINE_MAX + 2))) {
+ if (unlikely(!buffered_reader_next_line(&parser->reader, buffer))) {
if(unlikely(!buffered_reader_read_timeout(&parser->reader, fileno((FILE *)parser->fp_input), 2 * 60 * MSEC_PER_SEC)))
break;
+
+ continue;
}
- else if(unlikely(parser_action(parser, buffer)))
+
+ if(unlikely(parser_action(parser, buffer->buffer)))
break;
+
+ buffer->len = 0;
+ buffer->buffer[0] = '\0';
}
+ buffer_free(buffer);
cd->unsafe.enabled = parser->user.enabled;
count = parser->user.data_collections_count;
diff --git a/collectors/plugins.d/pluginsd_parser.h b/collectors/plugins.d/pluginsd_parser.h
index 9fdb5846dd..74767569b9 100644
--- a/collectors/plugins.d/pluginsd_parser.h
+++ b/collectors/plugins.d/pluginsd_parser.h
@@ -154,8 +154,8 @@ static inline int parser_action(PARSER *parser, char *input) {
parser->line++;
if(unlikely(parser->flags & PARSER_DEFER_UNTIL_KEYWORD)) {
- char command[PLUGINSD_LINE_MAX + 1];
- bool has_keyword = find_first_keyword(input, command, PLUGINSD_LINE_MAX, isspace_map_pluginsd);
+ char command[100 + 1];
+ bool has_keyword = find_first_keyword(input, command, 100, isspace_map_pluginsd);
if(!has_keyword || strcmp(command, parser->defer.end_keyword) != 0) {
if(parser->defer.response) {
@@ -183,7 +183,7 @@ static inline int parser_action(PARSER *parser, char *input) {
return 0;
}
- char *words[PLUGINSD_MAX_WORDS];
+ static __thread char *words[PLUGINSD_MAX_WORDS];
size_t num_words = quoted_strings_splitter_pluginsd(input, words, PLUGINSD_MAX_WORDS);
const char *command = get_word(words, num_words, 0);
diff --git a/collectors/systemd-journal.plugin/systemd-journal.c b/collectors/systemd-journal.plugin/systemd-journal.c
index 522c127bde..bfa51393fb 100644
--- a/collectors/systemd-journal.plugin/systemd-journal.c
+++ b/collectors/systemd-journal.plugin/systemd-journal.c
@@ -690,7 +690,7 @@ static void netdata_systemd_journal_rich_message(FACETS *facets __maybe_unused,
static void function_systemd_journal(const char *transaction, char *function, int timeout, bool *cancelled) {
BUFFER *wb = buffer_create(0, NULL);
buffer_flush(wb);
- buffer_json_initialize(wb, "\"", "\"", 0, true, BUFFER_JSON_OPTIONS_NEWLINE_ON_ARRAY_ITEMS);
+ buffer_json_initialize(wb, "\"", "\"", 0, true, BUFFER_JSON_OPTIONS_MINIFY);
FACETS *facets = facets_create(50, FACETS_OPTION_ALL_KEYS_FTS,
SYSTEMD_ALWAYS_VISIBLE_KEYS,
diff --git a/libnetdata/buffer/buffer.c b/libnetdata/buffer/buffer.c
index 2d09bb1ff6..feee72fcf5 100644
--- a/libnetdata/buffer/buffer.c
+++ b/libnetdata/buffer/buffer.c
@@ -20,16 +20,6 @@ void buffer_reset(BUFFER *wb) {
buffer_overflow_check(wb);
}
-const char *buffer_tostring(BUFFER *wb)
-{
- buffer_need_bytes(wb, 1);
- wb->buffer[wb->len] = '\0';
-
- buffer_overflow_check(wb);
-
- return(wb->buffer);
-}
-
void buffer_char_replace(BUFFER *wb, char from, char to) {
char *s = wb->buffer, *end = &wb->buffer[wb->len];
diff --git a/libnetdata/buffer/buffer.h b/libnetdata/buffer/buffer.h
index 0e3de92473..26efe00709 100644
--- a/libnetdata/buffer/buffer.h
+++ b/libnetdata/buffer/buffer.h
@@ -97,7 +97,6 @@ typedef struct web_buffer {
#define buffer_no_cacheable(wb) do { (wb)->options |= WB_CONTENT_NO_CACHEABLE; if((wb)->options & WB_CONTENT_CACHEABLE) (wb)->options &= ~WB_CONTENT_CACHEABLE; (wb)->expires = 0; } while(0)
#define buffer_strlen(wb) ((wb)->len)
-const char *buffer_tostring(BUFFER *wb);
#define BUFFER_OVERFLOW_EOF "EOF"
@@ -158,6 +157,16 @@ void buffer_json_initialize(BUFFER *wb, const char *key_quote, const char *value
void buffer_json_finalize(BUFFER *wb);
+static const char *buffer_tostring(BUFFER *wb)
+{
+ buffer_need_bytes(wb, 1);
+ wb->buffer[wb->len] = '\0';
+
+ buffer_overflow_check(wb);
+
+ return(wb->buffer);
+}
+
static inline void _buffer_json_depth_push(BUFFER *wb, BUFFER_JSON_NODE_TYPE type) {
#ifdef NETDATA_INTERNAL_CHECKS
assert(wb->json.depth <= BUFFER_JSON_MAX_DEPTH && "BUFFER JSON: max nesting reached");
diff --git a/streaming/receiver.c b/streaming/receiver.c
index 3ff022e973..10ef8b7d3f 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -226,53 +226,47 @@ static inline bool receiver_read_compressed(struct receiver_state *r) {
/* Produce a full line if one exists, statefully return where we start next time.
* When we hit the end of the buffer with a partial line move it to the beginning for the next fill.
*/
-inline char *buffered_reader_next_line(struct buffered_reader *reader, char *dst, size_t dst_size) {
+inline bool buffered_reader_next_line(struct buffered_reader *reader, BUFFER *dst) {
+ buffer_need_bytes(dst, reader->read_len - reader->pos + 2);
+
size_t start = reader->pos;
char *ss = &reader->read_buffer[start];
char *se = &reader->read_buffer[reader->read_len];
- char *ds = dst;
- char *de = &dst[dst_size - 2];
+ char *ds = &dst->buffer[dst->len];
+ char *de = &ds[dst->size - dst->len - 2];
if(ss >= se) {
*ds = '\0';
reader->pos = 0;
reader->read_len = 0;
reader->read_buffer[reader->read_len] = '\0';
- return NULL;
+ return false;
}
// copy all bytes to buffer
- while(ss < se && ds < de && *ss != '\n')
+ while(ss < se && ds < de && *ss != '\n') {
*ds++ = *ss++;
+ dst->len++;
+ }
// if we have a newline, return the buffer
if(ss < se && ds < de && *ss == '\n') {
// newline found in the r->read_buffer
*ds++ = *ss++; // copy the newline too
- *ds = '\0';
+ dst->len++;
- reader->pos = ss - reader->read_buffer;
- return dst;
- }
-
- // if the destination is full, oops!
- if(ds == de) {
- netdata_log_error("STREAM: received line exceeds %d bytes. Truncating it.", PLUGINSD_LINE_MAX);
*ds = '\0';
+
reader->pos = ss - reader->read_buffer;
- return dst;
+ return true;
}
- // no newline found in the r->read_buffer
- // move everything to the beginning
- memmove(reader->read_buffer, &reader->read_buffer[start], reader->read_len - start);
- reader->read_len -= (int)start;
- reader->read_buffer[reader->read_len] = '\0';
- *ds = '\0';
reader->pos = 0;
- return NULL;
+ reader->read_len = 0;
+ reader->read_buffer[reader->read_len] = '\0';
+ return false;
}
bool plugin_is_enabled(struct plugind *cd);
@@ -342,10 +336,10 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i
buffered_reader_init(&rpt->reader);
- char buffer[PLUGINSD_LINE_MAX + 2] = "";
+ BUFFER *buffer = buffer_create(sizeof(rpt->reader.read_buffer), NULL);
while(!receiver_should_stop(rpt)) {
- if(!buffered_reader_next_line(&rpt->reader, buffer, PLUGINSD_LINE_MAX + 2)) {
+ if(!buffered_reader_next_line(&rpt->reader, buffer)) {
bool have_new_data = compressed_connection ? receiver_read_compressed(rpt) : receiver_read_uncompressed(rpt);
if(unlikely(!have_new_data)) {
@@ -356,13 +350,15 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i
continue;
}
- if (unlikely(parser_action(parser, buffer))) {
- internal_error(true, "parser_action() failed on keyword '%s'.", buffer);
+ if (unlikely(parser_action(parser, buffer->buffer))) {
receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED, false);
break;
}
- }
+ buffer->len = 0;
+ buffer->buffer[0] = '\0';
+ }
+ buffer_free(buffer);
result = parser->user.data_collections_count;
// free parser with the pop function
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index 09df8e711f..f8d6926039 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -357,7 +357,7 @@ struct buffered_reader {
char read_buffer[PLUGINSD_LINE_MAX + 1];
};
-char *buffered_reader_next_line(struct buffered_reader *reader, char *dst, size_t dst_size);
+bool buffered_reader_next_line(struct buffered_reader *reader, BUFFER *dst);
static inline void buffered_reader_init(struct buffered_reader *reader) {
reader->read_buffer[0] = '\0';
reader->read_len = 0;